From b4f2a5c1780116110291996a1e79b9166f0691e9 Mon Sep 17 00:00:00 2001
From: Joe <4088382+JoeStech@users.noreply.github.com>
Date: Fri, 20 Mar 2026 15:53:24 -0600
Subject: [PATCH 1/3] full rebuild of knowledge base code with hybrid search
and ampere docs
---
embedding-generation/Dockerfile | 14 +-
embedding-generation/document_chunking.py | 520 ++++++++++++++++
embedding-generation/eval_questions.json | 85 +++
embedding-generation/evaluate_retrieval.py | 122 ++++
embedding-generation/generate-chunks.py | 554 +++++++++++-------
.../local_vectorstore_creation.py | 65 +-
embedding-generation/requirements.txt | 4 +-
.../tests/test_generate_chunks.py | 67 ++-
embedding-generation/vector-db-sources.csv | 88 +++
mcp-local/Dockerfile | 12 +-
mcp-local/requirements.txt | 3 +-
mcp-local/server.py | 42 +-
mcp-local/utils/search_utils.py | 290 ++++++---
13 files changed, 1539 insertions(+), 327 deletions(-)
create mode 100644 embedding-generation/document_chunking.py
create mode 100644 embedding-generation/eval_questions.json
create mode 100644 embedding-generation/evaluate_retrieval.py
diff --git a/embedding-generation/Dockerfile b/embedding-generation/Dockerfile
index dc88a56..4909565 100644
--- a/embedding-generation/Dockerfile
+++ b/embedding-generation/Dockerfile
@@ -19,9 +19,15 @@ FROM ${EMBEDDING_BASE_IMAGE} AS intrinsic-chunks
FROM ubuntu:24.04 AS builder
+ARG SOURCES_FILE=vector-db-sources.csv
+ARG EMBEDDING_MODEL=all-MiniLM-L6-v2
+
ENV DEBIAN_FRONTEND=noninteractive \
PIP_INDEX_URL=https://download.pytorch.org/whl/cpu \
- PIP_EXTRA_INDEX_URL=https://pypi.org/simple
+ PIP_EXTRA_INDEX_URL=https://pypi.org/simple \
+ SENTENCE_TRANSFORMER_MODEL=${EMBEDDING_MODEL} \
+ HF_HOME=/embedding-data/.cache/huggingface \
+ SENTENCE_TRANSFORMERS_HOME=/embedding-data/.cache/sentence_transformers
# Install Python
RUN apt-get update && apt-get install -y --no-install-recommends \
@@ -32,6 +38,7 @@ WORKDIR /embedding-data
# Copy Python scripts and dependencies
COPY generate-chunks.py .
+COPY document_chunking.py .
COPY local_vectorstore_creation.py .
COPY vector-db-sources.csv .
COPY requirements.txt .
@@ -42,8 +49,11 @@ COPY --from=intrinsic-chunks /embedding-data/intrinsic_chunks ./intrinsic_chunks
# Install Python dependencies (force CPU-only torch)
RUN pip3 install --no-cache-dir --break-system-packages -r requirements.txt
+# Pre-download the embedding model so local/offline loads succeed later in the build.
+RUN python3 -c "from sentence_transformers import SentenceTransformer; import os; SentenceTransformer(os.environ['SENTENCE_TRANSFORMER_MODEL'], cache_folder=os.environ['SENTENCE_TRANSFORMERS_HOME'])"
+
# Generate vector database
-RUN python3 generate-chunks.py vector-db-sources.csv && \
+RUN python3 generate-chunks.py ${SOURCES_FILE} && \
python3 local_vectorstore_creation.py && \
rm -f embeddings_*.txt
diff --git a/embedding-generation/document_chunking.py b/embedding-generation/document_chunking.py
new file mode 100644
index 0000000..43a6c99
--- /dev/null
+++ b/embedding-generation/document_chunking.py
@@ -0,0 +1,520 @@
+"""Utilities for parsing documentation sources into retrieval-friendly chunks."""
+
+from __future__ import annotations
+
+from dataclasses import dataclass
+from io import BytesIO
+import math
+import re
+from typing import Dict, Iterable, List, Optional
+from urllib.parse import urlparse
+
+from bs4 import BeautifulSoup
+from pypdf import PdfReader
+
+
+TOKEN_PATTERN = re.compile(r"\w+|[^\w\s]", re.UNICODE)
+WORD_PATTERN = re.compile(r"\S+")
+SENTENCE_SPLIT_PATTERN = re.compile(r"(?<=[.!?])\s+")
+MARKDOWN_HEADING_PATTERN = re.compile(r"^(#{1,6})\s+(.*)$")
+MARKDOWN_FENCE_PATTERN = re.compile(r"^(```|~~~)")
+HTML_HEADING_TAGS = {f"h{level}" for level in range(1, 7)}
+HTML_BLOCK_TAGS = HTML_HEADING_TAGS | {"p", "li", "pre", "code", "table"}
+BOILERPLATE_LINE_PATTERNS = [
+ re.compile(pattern, re.IGNORECASE)
+ for pattern in (
+ r"^register\s*login$",
+ r"^english\s*chinese$",
+ r"^about\s*\|\s*contact us\s*\|\s*privacy\s*\|\s*sitemap$",
+ r"^this site runs on ampere processors\.?$",
+ r"^created at\s*:",
+ r"^last updated at\s*:",
+ r"^copy$",
+ r"^table of contents$",
+ r"^on this page$",
+ r"^skip to content$",
+ r"^sign in$",
+ r"^sign up$",
+ r"^all rights reserved\.?$",
+ r"^ampere computing llc$",
+ r"^products solutions developers support resources company$",
+ )
+]
+
+
+@dataclass
+class Block:
+ kind: str
+ text: str
+
+
+@dataclass
+class Section:
+ heading_path: List[str]
+ blocks: List[Block]
+
+
+@dataclass
+class ParsedDocument:
+ source_url: str
+ resolved_url: str
+ display_title: str
+ content_type: str
+ sections: List[Section]
+
+
+def normalize_source_url(url: str) -> str:
+ """Strip browser-extension wrappers and normalize trivial URL noise."""
+ url = (url or "").strip()
+ if url.startswith("chrome-extension://") and "https:/" in url:
+ _, tail = url.split("https:/", 1)
+ url = f"https://{tail.lstrip('/')}"
+ return url
+
+
+def source_to_fetch_url(url: str) -> str:
+ """Resolve source URLs into directly fetchable content URLs."""
+ url = normalize_source_url(url)
+ if url == "https://learn.arm.com/migration":
+ return (
+ "https://raw.githubusercontent.com/ArmDeveloperEcosystem/"
+ "arm-learning-paths/refs/heads/main/content/migration/_index.md"
+ )
+ if "/github.com/aws/aws-graviton-getting-started/" in url:
+ specific_content = url.split("/main/", 1)[1]
+ return (
+ "https://raw.githubusercontent.com/aws/aws-graviton-getting-started/"
+ f"refs/heads/main/{specific_content}"
+ )
+ if url.startswith("https://github.com/") and "/blob/" in url:
+ owner_repo, path = url.split("/blob/", 1)
+ branch, relative_path = path.split("/", 1)
+ return owner_repo.replace("https://github.com/", "https://raw.githubusercontent.com/") + f"/{branch}/{relative_path}"
+ return url
+
+
+def estimate_tokens(text: str) -> int:
+ """Cheap token estimator good enough for chunk sizing."""
+ if not text:
+ return 0
+ return math.ceil(len(TOKEN_PATTERN.findall(text)) * 0.85)
+
+
+def clean_text(text: str) -> str:
+ text = text.replace("\r\n", "\n").replace("\r", "\n")
+ text = re.sub(r"[ \t]+", " ", text)
+ text = re.sub(r"\n{3,}", "\n\n", text)
+ return text.strip()
+
+
+def is_boilerplate_line(line: str) -> bool:
+ line = clean_text(line)
+ if not line:
+ return False
+ if re.fullmatch(r"©\s*\d{4}.*", line):
+ return True
+ if re.fullmatch(r"\d+\s*/\s*\d+", line):
+ return True
+ if re.fullmatch(r"\d+", line):
+ return True
+ return any(pattern.match(line) for pattern in BOILERPLATE_LINE_PATTERNS)
+
+
+def strip_frontmatter(markdown: str) -> str:
+ markdown = markdown.lstrip("\ufeff")
+ if markdown.startswith("---"):
+ end = markdown.find("\n---", 3)
+ if end != -1:
+ return markdown[end + 4 :].lstrip()
+ return markdown
+
+
+def normalize_heading_path(title: str, heading_path: List[str]) -> List[str]:
+ normalized = [clean_text(part) for part in heading_path if clean_text(part)]
+ if normalized and clean_text(normalized[0]).lower() == clean_text(title).lower():
+ normalized = normalized[1:]
+ return normalized
+
+
+def parse_markdown(markdown: str, source_url: str, resolved_url: str, fallback_title: str) -> ParsedDocument:
+ markdown = strip_frontmatter(markdown)
+ lines = markdown.splitlines()
+ heading_stack: List[str] = []
+ sections: List[Section] = []
+ current_blocks: List[Block] = []
+ current_paragraph: List[str] = []
+ current_code: List[str] = []
+ in_code_block = False
+ document_title = fallback_title
+
+ def flush_paragraph() -> None:
+ nonlocal current_paragraph
+ if not current_paragraph:
+ return
+ paragraph = clean_text("\n".join(current_paragraph))
+ current_paragraph = []
+ if paragraph and not is_boilerplate_line(paragraph):
+ current_blocks.append(Block("paragraph", paragraph))
+
+ def flush_code() -> None:
+ nonlocal current_code
+ if not current_code:
+ return
+ code = "\n".join(current_code).strip()
+ current_code = []
+ if code:
+ current_blocks.append(Block("code", code))
+
+ def flush_section() -> None:
+ if current_blocks:
+ sections.append(Section(list(heading_stack), list(current_blocks)))
+ current_blocks.clear()
+
+ for line in lines:
+ if MARKDOWN_FENCE_PATTERN.match(line.strip()):
+ if in_code_block:
+ current_code.append(line)
+ flush_code()
+ in_code_block = False
+ else:
+ flush_paragraph()
+ in_code_block = True
+ current_code = [line]
+ continue
+ if in_code_block:
+ current_code.append(line)
+ continue
+ heading_match = MARKDOWN_HEADING_PATTERN.match(line.strip())
+ if heading_match:
+ flush_paragraph()
+ flush_section()
+ level = len(heading_match.group(1))
+ heading_text = clean_text(heading_match.group(2))
+ if level == 1 and fallback_title == document_title:
+ document_title = heading_text
+ while len(heading_stack) >= level:
+ heading_stack.pop()
+ heading_stack.append(heading_text)
+ continue
+ if not line.strip():
+ flush_paragraph()
+ continue
+ current_paragraph.append(line)
+
+ flush_paragraph()
+ flush_code()
+ flush_section()
+ if not sections:
+ sections.append(Section([], [Block("paragraph", clean_text(markdown))]))
+ return ParsedDocument(
+ source_url=source_url,
+ resolved_url=resolved_url,
+ display_title=document_title,
+ content_type="markdown",
+ sections=sections,
+ )
+
+
+def _select_html_root(soup: BeautifulSoup):
+ for selector in ("main", "article", "[role='main']", ".article", ".content"):
+ root = soup.select_one(selector)
+ if root:
+ return root
+ return soup.body or soup
+
+
+def _should_skip_html_tag(tag) -> bool:
+ if tag.name not in HTML_BLOCK_TAGS:
+ return True
+ parent = tag.parent
+ while parent is not None:
+ if getattr(parent, "name", None) in HTML_BLOCK_TAGS:
+ if tag.name == "code" and parent.name == "pre":
+ return True
+ if tag.name == "li" and parent.name not in {"ul", "ol"}:
+ return True
+ if tag.name not in {"li"}:
+ return True
+ parent = parent.parent
+ return False
+
+
+def parse_html(html: str, source_url: str, resolved_url: str, fallback_title: str) -> ParsedDocument:
+ soup = BeautifulSoup(html, "html.parser")
+ for tag in soup.find_all(["script", "style", "nav", "footer", "header", "aside", "noscript", "svg", "form"]):
+ tag.decompose()
+ root = _select_html_root(soup)
+ title = fallback_title
+ if soup.find("meta", attrs={"property": "og:title"}):
+ title = clean_text(soup.find("meta", attrs={"property": "og:title"}).get("content", "")) or title
+ elif soup.title:
+ title = clean_text(soup.title.get_text(" ", strip=True)) or title
+
+ heading_stack: List[str] = []
+ sections: List[Section] = []
+ current_blocks: List[Block] = []
+ first_h1_seen = False
+
+ def flush_section() -> None:
+ if current_blocks:
+ sections.append(Section(list(heading_stack), list(current_blocks)))
+ current_blocks.clear()
+
+ for tag in root.find_all(list(HTML_BLOCK_TAGS)):
+ if _should_skip_html_tag(tag):
+ continue
+ text = clean_text(tag.get_text("\n" if tag.name == "pre" else " ", strip=True))
+ if not text or is_boilerplate_line(text):
+ continue
+ if tag.name in HTML_HEADING_TAGS:
+ flush_section()
+ level = int(tag.name[1])
+ while len(heading_stack) >= level:
+ heading_stack.pop()
+ heading_stack.append(text)
+ if level == 1 and not first_h1_seen:
+ title = text
+ first_h1_seen = True
+ continue
+ if tag.name == "table":
+ rows = []
+ for row in tag.find_all("tr"):
+ values = [clean_text(cell.get_text(" ", strip=True)) for cell in row.find_all(["th", "td"])]
+ values = [value for value in values if value]
+ if values:
+ rows.append(" | ".join(values))
+ text = "\n".join(rows)
+ if tag.name in {"pre", "code"}:
+ current_blocks.append(Block("code", f"```\n{text}\n```"))
+ elif tag.name == "li":
+ current_blocks.append(Block("paragraph", f"- {text}"))
+ else:
+ current_blocks.append(Block("paragraph", text))
+
+ flush_section()
+ if not sections:
+ page_text = clean_text(root.get_text("\n", strip=True))
+ if page_text:
+ sections.append(Section([], [Block("paragraph", page_text)]))
+ return ParsedDocument(
+ source_url=source_url,
+ resolved_url=resolved_url,
+ display_title=title,
+ content_type="html",
+ sections=sections,
+ )
+
+
+def looks_like_heading(paragraph: str) -> bool:
+ text = clean_text(paragraph)
+ if not text or len(text) > 120:
+ return False
+ if text.endswith((".", "!", "?", ":")):
+ return False
+ if len(text.split()) > 12:
+ return False
+ return text == text.title() or text == text.upper()
+
+
+def parse_pdf(pdf_bytes: bytes, source_url: str, resolved_url: str, fallback_title: str) -> ParsedDocument:
+ reader = PdfReader(BytesIO(pdf_bytes))
+ sections: List[Section] = []
+ document_title = fallback_title
+ for page_number, page in enumerate(reader.pages, start=1):
+ raw_text = clean_text(page.extract_text() or "")
+ if not raw_text:
+ continue
+ paragraphs = [clean_text(chunk) for chunk in re.split(r"\n\s*\n", raw_text) if clean_text(chunk)]
+ heading_path = [f"Page {page_number}"]
+ blocks: List[Block] = []
+ for paragraph in paragraphs:
+ if page_number == 1 and document_title == fallback_title and len(paragraph.split()) <= 12:
+ document_title = paragraph
+ continue
+ if looks_like_heading(paragraph):
+ heading_path = [f"Page {page_number}", paragraph]
+ continue
+ if is_boilerplate_line(paragraph):
+ continue
+ blocks.append(Block("paragraph", paragraph))
+ if blocks:
+ sections.append(Section(heading_path, blocks))
+ if not sections:
+ sections.append(Section([], [Block("paragraph", fallback_title)]))
+ return ParsedDocument(
+ source_url=source_url,
+ resolved_url=resolved_url,
+ display_title=document_title,
+ content_type="pdf",
+ sections=sections,
+ )
+
+
+def parse_document_content(
+ source_url: str,
+ resolved_url: str,
+ response_content: bytes,
+ content_type: str,
+ fallback_title: str,
+) -> ParsedDocument:
+ content_type = (content_type or "").lower()
+ if "pdf" in content_type or resolved_url.lower().endswith(".pdf"):
+ return parse_pdf(response_content, source_url, resolved_url, fallback_title)
+ decoded = response_content.decode("utf-8", errors="ignore")
+ if "markdown" in content_type or resolved_url.lower().endswith(".md"):
+ return parse_markdown(decoded, source_url, resolved_url, fallback_title)
+ if "html" in content_type or " List[str]:
+ merged: List[str] = []
+ i = 0
+ while i < len(blocks):
+ block = blocks[i]
+ if block.kind == "code":
+ parts = []
+ if merged:
+ previous = merged.pop()
+ if estimate_tokens(previous) <= 180:
+ parts.append(previous)
+ else:
+ merged.append(previous)
+ parts.append(block.text)
+ if i + 1 < len(blocks) and blocks[i + 1].kind != "code":
+ if estimate_tokens(blocks[i + 1].text) <= 180:
+ parts.append(blocks[i + 1].text)
+ i += 1
+ merged.append("\n\n".join(part for part in parts if part))
+ else:
+ merged.append(block.text)
+ i += 1
+ return [clean_text(item) for item in merged if clean_text(item)]
+
+
+def split_text_recursively(text: str, max_tokens: int) -> List[str]:
+ text = clean_text(text)
+ if not text:
+ return []
+ if estimate_tokens(text) <= max_tokens:
+ return [text]
+ parts = [clean_text(part) for part in re.split(r"\n\s*\n", text) if clean_text(part)]
+ if len(parts) > 1:
+ flattened: List[str] = []
+ for part in parts:
+ flattened.extend(split_text_recursively(part, max_tokens))
+ return flattened
+ if "```" not in text:
+ sentences = [clean_text(part) for part in SENTENCE_SPLIT_PATTERN.split(text) if clean_text(part)]
+ if len(sentences) > 1:
+ flattened = []
+ for sentence in sentences:
+ flattened.extend(split_text_recursively(sentence, max_tokens))
+ return flattened
+ words = WORD_PATTERN.findall(text)
+ step = max(1, int(max_tokens / 0.85))
+ return [" ".join(words[index : index + step]) for index in range(0, len(words), step)]
+
+
+def overlap_tail(text: str, overlap_tokens: int) -> str:
+ words = WORD_PATTERN.findall(text)
+ if len(words) <= overlap_tokens:
+ return text
+ return " ".join(words[-overlap_tokens:])
+
+
+def chunk_section_units(
+ units: List[str],
+ min_tokens: int,
+ max_tokens: int,
+ overlap_tokens: int,
+) -> List[str]:
+ normalized_units: List[str] = []
+ for unit in units:
+ normalized_units.extend(split_text_recursively(unit, max_tokens))
+
+ chunks: List[str] = []
+ current_units: List[str] = []
+ current_tokens = 0
+ for unit in normalized_units:
+ unit_tokens = estimate_tokens(unit)
+ if current_units and current_tokens + unit_tokens > max_tokens and current_tokens >= min_tokens:
+ current_text = "\n\n".join(current_units)
+ chunks.append(current_text.strip())
+ tail = overlap_tail(current_text, overlap_tokens)
+ current_units = [tail] if tail else []
+ current_tokens = estimate_tokens(tail)
+ current_units.append(unit)
+ current_tokens += unit_tokens
+
+ if current_units:
+ current_text = "\n\n".join(current_units).strip()
+ if chunks and estimate_tokens(current_text) < max(80, min_tokens // 2):
+ chunks[-1] = f"{chunks[-1]}\n\n{current_text}".strip()
+ else:
+ chunks.append(current_text)
+ return [chunk for chunk in chunks if clean_text(chunk)]
+
+
+def build_chunk_text(title: str, heading_path: List[str], body: str) -> str:
+ normalized_heading_path = normalize_heading_path(title, heading_path)
+ heading_label = " > ".join(normalized_heading_path) if normalized_heading_path else title
+ return clean_text(f"Document Title: {title}\nHeading Path: {heading_label}\n\n{body}")
+
+
+def derive_version(title: str, source_url: str, content: str = "") -> str:
+ haystack = " ".join([title, source_url, content[:4000]])
+ match = re.search(r"\b(v?\d+(?:\.\d+){0,2})\b", haystack, re.IGNORECASE)
+ if match:
+ return match.group(1)
+ match = re.search(r"\b(20\d{2})\b", haystack)
+ if match:
+ return match.group(1)
+ return ""
+
+
+def derive_product(title: str, source_url: str, doc_type: str, keywords: Iterable[str]) -> str:
+ haystack = " ".join([title, source_url, doc_type, *keywords]).lower()
+ if "graviton" in haystack:
+ return "AWS Graviton"
+ if "ampere" in haystack or "amperecomputing.com" in source_url:
+ return "Ampere"
+ if "learn.arm.com" in source_url or "/arm-" in source_url or " arm " in f" {haystack} ":
+ return "Arm"
+ return clean_text(doc_type) or "Documentation"
+
+
+def chunk_parsed_document(
+ parsed_document: ParsedDocument,
+ doc_type: str,
+ keywords: List[str],
+ min_tokens: int = 300,
+ max_tokens: int = 600,
+ overlap_tokens: int = 50,
+) -> List[Dict[str, str]]:
+ chunks: List[Dict[str, str]] = []
+ product = derive_product(parsed_document.display_title, parsed_document.source_url, doc_type, keywords)
+ version = derive_version(parsed_document.display_title, parsed_document.resolved_url)
+ for section in parsed_document.sections:
+ heading_path = normalize_heading_path(parsed_document.display_title, section.heading_path)
+ units = merge_code_context(section.blocks)
+ if not units:
+ continue
+ for chunk_body in chunk_section_units(units, min_tokens, max_tokens, overlap_tokens):
+ heading = heading_path[-1] if heading_path else parsed_document.display_title
+ chunks.append(
+ {
+ "title": parsed_document.display_title,
+ "url": parsed_document.source_url,
+ "resolved_url": parsed_document.resolved_url,
+ "heading": heading,
+ "heading_path": heading_path,
+ "doc_type": doc_type,
+ "product": product,
+ "version": version,
+ "content_type": parsed_document.content_type,
+ "content": build_chunk_text(parsed_document.display_title, heading_path, chunk_body),
+ }
+ )
+ return chunks
diff --git a/embedding-generation/eval_questions.json b/embedding-generation/eval_questions.json
new file mode 100644
index 0000000..242f5a3
--- /dev/null
+++ b/embedding-generation/eval_questions.json
@@ -0,0 +1,85 @@
+[
+ {
+ "question": "How should worker_processes, worker_connections, and keepalive settings be tuned for NGINX on Ampere processors?",
+ "expected_urls": ["https://amperecomputing.com/tuning-guides/nginx-tuning-guide"]
+ },
+ {
+ "question": "Which MySQL configuration and benchmarking steps are recommended to improve OLTP performance on Ampere systems?",
+ "expected_urls": ["https://amperecomputing.com/tuning-guides/mysql-tuning-guide"]
+ },
+ {
+ "question": "What Redis server settings and benchmark client parameters does the Ampere tuning guide focus on?",
+ "expected_urls": ["https://amperecomputing.com/tuning-guides/Redis-setup-and-tuning-guide"]
+ },
+ {
+ "question": "How should Kafka brokers, storage, and benchmark settings be tuned on Ampere for better throughput and latency?",
+ "expected_urls": ["https://amperecomputing.com/tuning-guides/apache-kafka-tuning-guide"]
+ },
+ {
+ "question": "What JVM flags, profiling workflow, and GC advice are recommended for Java on Ampere Altra family processors?",
+ "expected_urls": ["https://amperecomputing.com/tuning-guides/unlocking-java-performance-tuning-guide"]
+ },
+ {
+ "question": "How do locking primitives and memory ordering work on Ampere Altra, and when are barriers required?",
+ "expected_urls": ["https://amperecomputing.com/tuning-guides/locking-primitives-and-memory-ordering"]
+ },
+ {
+ "question": "What huge page sizes are available on Arm64, and when should larger page sizes be used for performance tuning?",
+ "expected_urls": ["https://amperecomputing.com/tuning-guides/understanding-memory-page-sizes-on-arm64"]
+ },
+ {
+ "question": "Which GCC compiler options and tuning recommendations are called out in the 2025 Ampere GCC guide?",
+ "expected_urls": ["https://amperecomputing.com/tutorials/gcc-guide-ampere-processors"]
+ },
+ {
+ "question": "How do I use the Ampere Porting Advisor to inspect Arm64 migration issues before porting an application?",
+ "expected_urls": ["https://amperecomputing.com/tutorials/porting-advisor"]
+ },
+ {
+ "question": "What are the main deployment steps in the reference architecture for running an ELK stack on Google Tau T2A?",
+ "expected_urls": ["https://amperecomputing.com/reference-architecture/deploying-an-elk-stack-on-google-tau-t2a"]
+ },
+ {
+ "question": "How do I build and tune DPDK cryptography workloads on Ampere systems?",
+ "expected_urls": ["https://amperecomputing.com/tuning-guides/dpdk-cryptography-build-and-tuning-guide"]
+ },
+ {
+ "question": "What huge page, NIC, and core-affinity setup is recommended in the DPDK setup and tuning guide for Ampere?",
+ "expected_urls": ["https://amperecomputing.com/tuning-guides/DPDK-setup-and-tuning-guide"]
+ },
+ {
+ "question": "What bare-metal tuning advice does the Hadoop guide provide for Ampere processors?",
+ "expected_urls": ["https://amperecomputing.com/tuning-guides/hadoop-tuning-guide-on-bare-metal"]
+ },
+ {
+ "question": "How should MongoDB be configured and benchmarked on Ampere processors for better performance?",
+ "expected_urls": ["https://amperecomputing.com/tuning-guides/mongoDB-tuning-guide"]
+ },
+ {
+ "question": "What thread-count, connection, and benchmarking guidance does the Memcached tuning guide give for Ampere?",
+ "expected_urls": ["https://amperecomputing.com/tuning-guides/memcached-tuning-guide"]
+ },
+ {
+ "question": "How can cryptography libraries be accelerated on Ampere processors according to the Ampere tutorial?",
+ "expected_urls": ["https://amperecomputing.com/tutorials/cryptography"]
+ },
+ {
+ "question": "What does the Azure Dpsv5 workload brief say about running AI inference workloads on Ampere-based virtual machines?",
+ "expected_urls": ["https://amperecomputing.com/briefs/ai-inference-on-azure-brief"]
+ },
+ {
+ "question": "Which storage layout and deployment pattern is described in the MinIO single-node workload brief on Ampere?",
+ "expected_urls": ["https://www.amperecomputing.com/briefs/minio-on-single-node-brief"]
+ },
+ {
+ "question": "How do I get started with cloud-native FreeBSD on OCI Ampere A1 using Terraform?",
+ "expected_urls": ["https://amperecomputing.com/blogs/getting-cloud-native-with-freebsd-on-oci-ampere-a1-with-terraform-"]
+ },
+ {
+ "question": "In the AWS Graviton performance runbook, how should I define a benchmark and configure the system under test before optimization?",
+ "expected_urls": [
+ "https://github.com/aws/aws-graviton-getting-started/blob/main/perfrunbook/defining_your_benchmark.md",
+ "https://github.com/aws/aws-graviton-getting-started/blob/main/perfrunbook/configuring_your_sut.md"
+ ]
+ }
+]
diff --git a/embedding-generation/evaluate_retrieval.py b/embedding-generation/evaluate_retrieval.py
new file mode 100644
index 0000000..e5e62c0
--- /dev/null
+++ b/embedding-generation/evaluate_retrieval.py
@@ -0,0 +1,122 @@
+"""Run a small retrieval evaluation over the local metadata and index."""
+
+from __future__ import annotations
+
+import argparse
+import json
+import os
+import sys
+from pathlib import Path
+
+from sentence_transformers import SentenceTransformer
+
+
+REPO_ROOT = Path(__file__).resolve().parents[1]
+MCP_LOCAL_DIR = REPO_ROOT / "mcp-local"
+if str(MCP_LOCAL_DIR) not in sys.path:
+ sys.path.insert(0, str(MCP_LOCAL_DIR))
+
+from utils.search_utils import build_bm25_index, deduplicate_urls, hybrid_search, load_metadata, load_usearch_index # noqa: E402
+
+
+def sentence_transformer_cache_folder() -> str | None:
+ return os.getenv("SENTENCE_TRANSFORMERS_HOME") or None
+
+
+def evaluate(index_path: Path, metadata_path: Path, eval_path: Path, model_name: str, top_k: int) -> int:
+ metadata = load_metadata(str(metadata_path))
+ if not metadata:
+ print(f"Metadata not found or empty: {metadata_path}")
+ return 1
+
+ embedding_model = SentenceTransformer(
+ model_name,
+ cache_folder=sentence_transformer_cache_folder(),
+ local_files_only=True,
+ )
+ usearch_index = load_usearch_index(
+ str(index_path),
+ embedding_model.get_sentence_embedding_dimension(),
+ )
+ bm25_index = build_bm25_index(metadata)
+
+ with eval_path.open() as file:
+ eval_rows = json.load(file)
+
+ hits_at_1 = 0
+ hits_at_3 = 0
+ hits_at_5 = 0
+ reciprocal_ranks = []
+ misses = []
+
+ for row in eval_rows:
+ raw_results = hybrid_search(
+ row["question"],
+ usearch_index,
+ metadata,
+ embedding_model,
+ bm25_index,
+ k=top_k,
+ )
+ results = deduplicate_urls(raw_results, max_chunks_per_url=1)[:top_k]
+ ranked_urls = [item["metadata"].get("url") for item in results]
+ expected = set(row["expected_urls"])
+
+ match_rank = None
+ for index, url in enumerate(ranked_urls, start=1):
+ if url in expected:
+ match_rank = index
+ break
+
+ if match_rank == 1:
+ hits_at_1 += 1
+ if match_rank is not None and match_rank <= 3:
+ hits_at_3 += 1
+ if match_rank is not None and match_rank <= 5:
+ hits_at_5 += 1
+ reciprocal_ranks.append(0 if match_rank is None else 1 / match_rank)
+
+ if match_rank is None:
+ misses.append(
+ {
+ "question": row["question"],
+ "expected_urls": row["expected_urls"],
+ "ranked_urls": ranked_urls,
+ }
+ )
+
+ total = len(eval_rows)
+ print(f"Questions: {total}")
+ print(f"Hit@1: {hits_at_1 / total:.2%}")
+ print(f"Hit@3: {hits_at_3 / total:.2%}")
+ print(f"Hit@5: {hits_at_5 / total:.2%}")
+ print(f"MRR: {sum(reciprocal_ranks) / total:.3f}")
+ print(f"Misses: {len(misses)}")
+ for miss in misses[:10]:
+ print()
+ print(f"Q: {miss['question']}")
+ print(f"Expected: {miss['expected_urls']}")
+ print(f"Got: {miss['ranked_urls']}")
+ return 0
+
+
+def main() -> int:
+ parser = argparse.ArgumentParser(description="Evaluate retrieval over the generated local knowledge base.")
+ parser.add_argument("--index-path", default="usearch_index.bin")
+ parser.add_argument("--metadata-path", default="metadata.json")
+ parser.add_argument("--eval-path", default="eval_questions.json")
+ parser.add_argument("--model-name", default="all-MiniLM-L6-v2")
+ parser.add_argument("--top-k", type=int, default=5)
+ args = parser.parse_args()
+
+ return evaluate(
+ index_path=Path(args.index_path),
+ metadata_path=Path(args.metadata_path),
+ eval_path=Path(args.eval_path),
+ model_name=args.model_name,
+ top_k=args.top_k,
+ )
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())
diff --git a/embedding-generation/generate-chunks.py b/embedding-generation/generate-chunks.py
index 2175820..18e56fe 100644
--- a/embedding-generation/generate-chunks.py
+++ b/embedding-generation/generate-chunks.py
@@ -13,14 +13,12 @@
# limitations under the License.
import argparse
-import sys
import os
import re
import uuid
import yaml
import csv
import datetime
-import json
import boto3
from botocore.exceptions import NoCredentialsError, ClientError
@@ -28,6 +26,16 @@
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
+from urllib.parse import parse_qs, urlparse
+
+from document_chunking import (
+ chunk_parsed_document,
+ derive_product,
+ derive_version,
+ normalize_source_url,
+ parse_document_content,
+ source_to_fetch_url,
+)
# Create a session with retry logic for resilient HTTP requests
@@ -88,14 +96,18 @@ def ensure_intrinsic_chunks_from_s3(local_folder='intrinsic_chunks',
2. Learning Path titles must come from index page...send through function along with Graviton.
'''
-yaml_dir = 'yaml_data'
-details_file = 'info/chunk_details.csv'
+yaml_dir = os.getenv('YAML_OUTPUT_DIR', 'yaml_data')
+details_file = os.getenv('CHUNK_DETAILS_FILE', 'info/chunk_details.csv')
chunk_index = 1
# Global var to prevent duplication entries from cross platform learning paths
cross_platform_lps_dont_duplicate = []
+# Cache the ecosystem dashboard page so package entries do not re-fetch the same
+# multi-megabyte HTML document for every source row.
+ecosystem_dashboard_entries = None
+
# Global tracking for vector-db-sources.csv
# Set of URLs already in the CSV (for deduplication)
known_source_urls = set()
@@ -181,11 +193,32 @@ def save_sources_csv(csv_file):
print(f"Saved {len(all_sources)} sources to '{csv_file}'")
class Chunk:
- def __init__(self, title, url, uuid, keywords, content):
+ def __init__(
+ self,
+ title,
+ url,
+ uuid,
+ keywords,
+ content,
+ heading="",
+ heading_path=None,
+ doc_type="",
+ product="",
+ version="",
+ resolved_url="",
+ content_type="",
+ ):
self.title = title
self.url = url
self.uuid = uuid
self.content = content
+ self.heading = heading
+ self.heading_path = heading_path or []
+ self.doc_type = doc_type
+ self.product = product
+ self.version = version
+ self.resolved_url = resolved_url
+ self.content_type = content_type
# Translate keyword list into comma-separated string, and add similar words to keywords.
self.keywords = self.formatKeywords(keywords)
@@ -201,88 +234,161 @@ def toDict(self):
'url': self.url,
'uuid': self.uuid,
'keywords': self.keywords,
- 'content': self.content
+ 'content': self.content,
+ 'heading': self.heading,
+ 'heading_path': self.heading_path,
+ 'doc_type': self.doc_type,
+ 'product': self.product,
+ 'version': self.version,
+ 'resolved_url': self.resolved_url,
+ 'content_type': self.content_type,
}
def __repr__(self):
- return f"Chunk(title={self.title}, focus={self.focus}, url={self.url}, uuid={self.uuid}, display_name={self.display_name}, content={self.content})"
+ return f"Chunk(title={self.title}, url={self.url}, uuid={self.uuid}, heading={self.heading})"
-def createEcosystemDashboardChunks():
- ''' Format of Chunk text_snippet:
- .NET works on Arm Linux servers starting from version 5 released in November 2020.
+def build_ecosystem_dashboard_entries():
+ """Load and cache package-level snippets from the ecosystem dashboard."""
+ global ecosystem_dashboard_entries
+ if ecosystem_dashboard_entries is not None:
+ return ecosystem_dashboard_entries
- [Download .NET here.](https://dotnet.microsoft.com/en-us/download/dotnet)
+ def create_text_snippet(main_row):
+ package_name = main_row.get('data-title')
+ download_link = main_row.find('a', class_='download-icon-a')
+ download_url = download_link.get('href') if download_link else None
- To get started quickly, here are some helpful guides from different sources:
- - [Arm guide](https://learn.arm.com/install-guides/dotnet/)
- - [CSP guide](https://aws.amazon.com/blogs/dotnet/powering-net-8-with-aws-graviton3-benchmarks/)
- - [Official documentation](https://learn.microsoft.com/en-us/dotnet/core/install/linux-ubuntu)
- '''
-
- def createTextSnippet(main_row):
- package_name = row.get('data-title')
- download_url = row.find('a', class_='download-icon-a').get('href')
-
- # Get the support statement
next_row = main_row.find_next_sibling('tr')
- works_on_arm_div = next_row.find('div', class_='description')
-
- arm_support_statement = works_on_arm_div.get_text().replace('\n',' ')
-
- # Get individual links to help
- quick_start_links_div = works_on_arm_div.parent.find_next_sibling('section').find('div', class_='description')
- li_elements = quick_start_links_div.find_all('li')
- get_started_text = ""
- if li_elements:
- get_started_text = "\n\nTo get started quickly, here are some helpful guides from different sources:\n"
- for li in quick_start_links_div.find_all('li'):
- get_started_text = get_started_text + f"- [{li.find('a').get_text()}]({li.find('a').get('href')})\n"
-
-
-
- text_snippet = f"{arm_support_statement}\n\n[Download {package_name} here.]({download_url}){get_started_text}"
- return text_snippet
+ works_on_arm_div = next_row.find('div', class_='description') if next_row else None
+ arm_support_statement = ""
+ if works_on_arm_div:
+ arm_support_statement = works_on_arm_div.get_text(" ", strip=True)
+
+ quick_start_section = None
+ if works_on_arm_div and works_on_arm_div.parent:
+ next_section = works_on_arm_div.parent.find_next_sibling('section')
+ if next_section:
+ quick_start_section = next_section.find('div', class_='description')
+
+ quick_start_lines = []
+ if quick_start_section:
+ for li in quick_start_section.find_all('li'):
+ link = li.find('a')
+ if not link:
+ continue
+ link_text = link.get_text(" ", strip=True)
+ link_href = link.get('href')
+ if link_text and link_href:
+ quick_start_lines.append(f"- [{link_text}]({link_href})")
+
+ snippet_parts = []
+ if arm_support_statement:
+ snippet_parts.append(arm_support_statement)
+ if download_url:
+ snippet_parts.append(f"[Download {package_name} here.]({download_url})")
+ if quick_start_lines:
+ snippet_parts.append(
+ "To get started quickly, here are some helpful guides from different sources:\n"
+ + "\n".join(quick_start_lines)
+ )
+ return "\n\n".join(part for part in snippet_parts if part)
- # Obtain all
url = "https://www.arm.com/developer-hub/ecosystem-dashboard/"
response = http_session.get(url, timeout=60)
+ response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
- rows = soup.find_all('tr', class_=['main-sw-row'])
+ rows = soup.find_all('tr', class_=['main-sw-row'])
+ entries = {}
for row in rows:
- # Obtain details for text snippet
- text_snippet = createTextSnippet(row)
package_name = row.get('data-title')
- package_name_urlized = row.get('data-title-urlized')
+ package_slug = row.get('data-title-urlized')
+ if not package_name or not package_slug:
+ continue
- # Keywords
- keywords=[package_name]
- for c in row.get('class'):
+ keywords = [package_name]
+ for c in row.get('class', []):
if 'tag-' in c:
keywords.append(c.replace('tag-license-','').replace('tag-category-',''))
+ package_url = f"{url}?package={package_slug}"
+ entries[package_slug] = {
+ "display_name": f"Ecosystem Dashboard - {package_name}",
+ "package_name": package_name,
+ "keywords": keywords,
+ "url": package_url,
+ "resolved_url": response.url + f"?package={package_slug}",
+ "content": create_text_snippet(row),
+ }
- package_url = f"{url}?package={package_name_urlized}"
-
- # Register this ecosystem dashboard entry as a source
+ ecosystem_dashboard_entries = entries
+ return ecosystem_dashboard_entries
+
+
+def ecosystem_dashboard_slug_from_url(source_url):
+ query = parse_qs(urlparse(source_url).query)
+ values = query.get("package", [])
+ if values:
+ return values[0].strip()
+ return ""
+
+
+def create_ecosystem_dashboard_chunk(source_url, source_name, keywords_value):
+ package_slug = ecosystem_dashboard_slug_from_url(source_url)
+ if not package_slug:
+ return []
+
+ entry = build_ecosystem_dashboard_entries().get(package_slug)
+ if not entry or not entry["content"]:
+ return []
+
+ keywords = parse_keywords(keywords_value, entry["package_name"])
+ return [
+ createChunk(
+ text_snippet=entry["content"],
+ WEBSITE_url=normalize_source_url(source_url),
+ keywords=keywords,
+ title=entry["display_name"],
+ heading=entry["package_name"],
+ heading_path=[entry["package_name"]],
+ doc_type="Ecosystem Dashboard",
+ product=derive_product(entry["display_name"], source_url, "Ecosystem Dashboard", keywords),
+ version=derive_version(entry["display_name"], entry["resolved_url"], entry["content"]),
+ resolved_url=entry["resolved_url"],
+ content_type="html",
+ )
+ ]
+
+
+def createEcosystemDashboardChunks(emit_chunks=True):
+ for entry in build_ecosystem_dashboard_entries().values():
register_source(
site_name='Ecosystem Dashboard',
license_type='Arm Proprietary',
- display_name=f'Ecosystem Dashboard - {package_name}',
- url=package_url,
- keywords=keywords
+ display_name=entry["display_name"],
+ url=entry["url"],
+ keywords=entry["keywords"]
)
-
+ if not emit_chunks:
+ continue
+
chunk = Chunk(
- title = f"Ecosystem Dashboard - {package_name}",
- url = package_url,
- uuid = str(uuid.uuid4()),
- keywords = keywords,
- content = text_snippet
+ title=entry["display_name"],
+ url=entry["url"],
+ uuid=str(uuid.uuid4()),
+ keywords=entry["keywords"],
+ content=entry["content"],
+ heading=entry["package_name"],
+ heading_path=[entry["package_name"]],
+ doc_type="Ecosystem Dashboard",
+ product=derive_product(entry["display_name"], entry["url"], "Ecosystem Dashboard", entry["keywords"]),
+ version=derive_version(entry["display_name"], entry["resolved_url"], entry["content"]),
+ resolved_url=entry["resolved_url"],
+ content_type="html",
)
- chunkSaveAndTrack(url,chunk)
+ chunkSaveAndTrack(entry["url"], chunk)
- return
+ return
def createIntrinsicsDatabaseChunks():
@@ -403,30 +509,50 @@ def htmlToMarkdown(html_string):
'''
-def processLearningPath(url,type):
+def processLearningPath(url, type, emit_chunks=True):
github_raw_link = "https://raw.githubusercontent.com/ArmDeveloperEcosystem/arm-learning-paths/refs/heads/production/content"
site_link = "https://learn.arm.com"
def chunkizeLearningPath(relative_url, title, keywords):
+ if not emit_chunks:
+ return
if relative_url.endswith('/'):
relative_url = relative_url[:-1]
MARKDOWN_url = github_raw_link + relative_url + '.md'
WEBSITE_url = site_link + relative_url
+ response = fetch_with_logging(MARKDOWN_url)
+ if response is None:
+ return
+ parsed_document = parse_document_content(
+ source_url=WEBSITE_url,
+ resolved_url=response.url,
+ response_content=response.content,
+ content_type=response.headers.get("content-type", "text/markdown"),
+ fallback_title=title,
+ )
+ chunk_payloads = chunk_parsed_document(
+ parsed_document,
+ doc_type=type,
+ keywords=keywords,
+ )
- # 3) Extract markdown, skipping those that are 404ing
- if not URLIsValidCheck(MARKDOWN_url):
- return
- markdown = obtainMarkdownContentFromGitHubMDFile(MARKDOWN_url)
-
- # 4) Get sized text snippets the markdown
- text_snippets = obtainTextSnippets__Markdown(markdown)
-
- # 5) Create chunks for each snippet by adding metadata
- for text_snippet in text_snippets:
- chunk = createChunk(text_snippet, WEBSITE_url, keywords, title)
-
- chunkSaveAndTrack(WEBSITE_url,chunk)
+ # 5) Create chunks for each snippet by adding metadata
+ for payload in chunk_payloads:
+ chunk = createChunk(
+ payload["content"],
+ WEBSITE_url,
+ keywords,
+ payload["title"],
+ heading=payload["heading"],
+ heading_path=payload["heading_path"],
+ doc_type=payload["doc_type"],
+ product=payload["product"],
+ version=payload["version"],
+ resolved_url=payload["resolved_url"],
+ content_type=payload["content_type"],
+ )
+ chunkSaveAndTrack(WEBSITE_url,chunk)
if type == 'Learning Path':
@@ -534,20 +660,20 @@ def chunkizeLearningPath(relative_url, title, keywords):
for guide in multi_install_guides:
sub_ig_rel_url = guide.get('link')
- chunkizeLearningPath(sub_ig_rel_url,title, keywords)
+ chunkizeLearningPath(sub_ig_rel_url,title, keywords)
# If not multi-install (most cases)
else:
chunkizeLearningPath(ig_rel_url,title, keywords)
-def createLearningPathChunks():
+def createLearningPathChunks(emit_chunks=True):
# Find all categories to iterate over
learn_url = "https://learn.arm.com/"
response = http_session.get(learn_url, timeout=60)
soup = BeautifulSoup(response.text, 'html.parser')
# Process Install Guides separately (directly from /install-guides page)
- processLearningPath("/install-guides", "Install Guide")
+ processLearningPath("/install-guides", "Install Guide", emit_chunks=emit_chunks)
# Find category links - main-topic-card elements are now wrapped in tags
# Look for tags that contain main-topic-card divs
@@ -569,7 +695,7 @@ def createLearningPathChunks():
continue
lp_url = learn_url.rstrip('/') + lp_link
# Chunking step
- processLearningPath(lp_url, "Learning Path")
+ processLearningPath(lp_url, "Learning Path", emit_chunks=emit_chunks)
def readInCSV(csv_file):
@@ -581,7 +707,9 @@ def readInCSV(csv_file):
csv_dict = {
'urls': [],
'focus': [],
- 'source_names': []
+ 'source_names': [],
+ 'site_names': [],
+ 'license_types': [],
}
if not os.path.exists(csv_file):
@@ -590,9 +718,11 @@ def readInCSV(csv_file):
with open(csv_file, 'r', newline='', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row in reader:
- csv_dict['urls'].append(row.get('URL', ''))
+ csv_dict['urls'].append(normalize_source_url(row.get('URL', '')))
csv_dict['focus'].append(row.get('Keywords', ''))
csv_dict['source_names'].append(row.get('Display Name', ''))
+ csv_dict['site_names'].append(row.get('Site Name', ''))
+ csv_dict['license_types'].append(row.get('License Type', ''))
return csv_dict, len(csv_dict['urls'])
@@ -601,30 +731,14 @@ def getMarkdownGitHubURLsFromPage(url):
GH_urls = []
SITE_urls = []
- if url == 'https://learn.arm.com/migration':
- github_raw_link = "https://raw.githubusercontent.com/ArmDeveloperEcosystem/arm-learning-paths/refs/heads/main/content"
- github_md_link = github_raw_link + '/migration/_index.md'
-
- SITE_urls.append(url)
- GH_urls.append(github_md_link)
-
- elif '/github.com/aws/aws-graviton-getting-started/' in url:
- github_raw_link = "https://raw.githubusercontent.com/aws/aws-graviton-getting-started/refs/heads/main/"
-
- # Rip off part of the URL after '/main/'
- specific_content = url.split('/main/')[1]
-
- github_md_link = github_raw_link + specific_content
-
- SITE_urls.append(url)
- GH_urls.append(github_md_link)
-
+ fetch_url = source_to_fetch_url(url)
+ if fetch_url != normalize_source_url(url):
+ SITE_urls.append(normalize_source_url(url))
+ GH_urls.append(fetch_url)
else:
print('url doesnt match expected format. Check function and try again.')
print('URL: ',url)
-
-
return GH_urls, SITE_urls
@@ -639,6 +753,25 @@ def URLIsValidCheck(url):
csv_writer = csv.writer(csvfile)
csv_writer.writerow([url,str(http_err)])
return False
+
+
+def fetch_with_logging(url):
+ try:
+ response = http_session.get(url, timeout=60)
+ response.raise_for_status()
+ return response
+ except requests.exceptions.HTTPError as http_err:
+ print(f"HTTP error occurred: {http_err}")
+ with open('info/errors.csv', 'a', newline='') as csvfile:
+ csv_writer = csv.writer(csvfile)
+ csv_writer.writerow([url, str(http_err)])
+ return None
+ except Exception as err:
+ print(f"Other error occurred: {err}")
+ with open('info/errors.csv', 'a', newline='') as csvfile:
+ csv_writer = csv.writer(csvfile)
+ csv_writer.writerow([url, str(err)])
+ return None
except Exception as err:
print(f"Other error occurred: {err}")
with open('info/errors.csv', 'a', newline='') as csvfile:
@@ -652,106 +785,57 @@ def obtainMarkdownContentFromGitHubMDFile(gh_url):
response.raise_for_status() # Ensure we got a valid response
md_content = response.text
-
- # Remove frontmatter bounded by '---'
- md_content = md_content[md_content.find('---', 3) + 3:].strip() # +3 to remove the '---' and strip to remove leading/trailing whitespace
-
return md_content
def obtainTextSnippets__Markdown(content, min_words=300, max_words=500, min_final_words=200):
- """Split content into chunks based on headers and word count constraints."""
-
- # Helper function to count words
- def word_count(text):
- return len(text.split())
-
- # Helper function to split content by a given heading level (e.g., h2, h3, h4)
- def split_by_heading(content, heading_level):
- pattern = re.compile(rf'(?<=\n)({heading_level} .+)', re.IGNORECASE)
- return pattern.split(content)
-
- # Helper function to chunk content
- def create_chunks(content_pieces, heading_level='##'):
- """
- Create chunks from content pieces based on the word count limits.
- """
- chunks = []
- current_chunk = ""
- current_word_count = 0
-
- for piece in content_pieces:
- piece_word_count = word_count(piece)
-
- # Check if the current piece starts with the heading level, indicating the start of a new section
- if re.match(rf'^{heading_level} ', piece.strip()):
- # If the current chunk has enough words, finalize it and start a new chunk
- if current_word_count >= min_words:
- chunks.append(current_chunk.strip())
- current_chunk = ""
- current_word_count = 0
-
- # Add the piece to the current chunk
- if current_word_count + piece_word_count > max_words and current_word_count >= min_words:
- # If adding this piece exceeds max_words, finalize the current chunk
- chunks.append(current_chunk.strip())
- current_chunk = piece.strip()
- current_word_count = piece_word_count
- else:
- current_chunk += piece + "\n"
- current_word_count += piece_word_count
-
- # Handle the last chunk
- if current_chunk.strip():
- if current_word_count < min_final_words and chunks:
- # If the last chunk is too small, merge it with the previous chunk
- chunks[-1] += "\n" + current_chunk.strip()
- else:
- # Otherwise, add it as a separate chunk
- chunks.append(current_chunk.strip())
-
- return chunks
-
- # 1. Split by h2 headings
- content_pieces = split_by_heading(content, '##')
- chunks = create_chunks(content_pieces)
-
- # 2. Further split large chunks by h3 if they exceed max_words
- final_chunks = []
- for chunk in chunks:
- if word_count(chunk) > max_words:
- sub_pieces = split_by_heading(chunk, '###')
- sub_chunks = create_chunks(sub_pieces,'###')
-
- # 3. Further split large sub-chunks by h4 if they exceed max_words
- for sub_chunk in sub_chunks:
- if word_count(sub_chunk) > max_words:
- sub_sub_pieces = split_by_heading(sub_chunk, '####')
- sub_sub_chunks = create_chunks(sub_sub_pieces,'####')
-
- # 4. If still too large, split by paragraph
- for sub_sub_chunk in sub_sub_chunks:
- if word_count(sub_sub_chunk) > max_words:
- paragraphs = sub_sub_chunk.split('\n\n')
- paragraph_chunks = create_chunks(paragraphs)
- final_chunks.extend(paragraph_chunks)
- else:
- final_chunks.append(sub_sub_chunk)
- else:
- final_chunks.append(sub_chunk)
- else:
- final_chunks.append(chunk)
-
- return final_chunks
-
-
-def createChunk(text_snippet,WEBSITE_url,keywords,title):
+ """Backward-compatible wrapper that now uses structured chunking."""
+ if not content or not content.strip():
+ return []
+ parsed_document = parse_document_content(
+ source_url="https://example.com",
+ resolved_url="https://example.com/doc.md",
+ response_content=content.encode("utf-8"),
+ content_type="text/markdown",
+ fallback_title="Document",
+ )
+ chunks = chunk_parsed_document(
+ parsed_document,
+ doc_type="Markdown",
+ keywords=[],
+ min_tokens=min_words,
+ max_tokens=max_words,
+ overlap_tokens=max(0, min_final_words // 4),
+ )
+ return [chunk["content"] for chunk in chunks]
+
+
+def createChunk(
+ text_snippet,
+ WEBSITE_url,
+ keywords,
+ title,
+ heading="",
+ heading_path=None,
+ doc_type="",
+ product="",
+ version="",
+ resolved_url="",
+ content_type="",
+):
chunk = Chunk(
title = title,
url = WEBSITE_url,
uuid = str(uuid.uuid4()),
keywords = keywords,
- content = text_snippet
+ content = text_snippet,
+ heading = heading,
+ heading_path = heading_path or [],
+ doc_type = doc_type,
+ product = product,
+ version = version,
+ resolved_url = resolved_url,
+ content_type = content_type,
)
return chunk
@@ -768,6 +852,48 @@ def printChunks(chunks):
print('='*100)
+def parse_keywords(keywords_value, title=""):
+ keywords = [keyword.strip() for keyword in re.split(r"[;,]", keywords_value or "") if keyword.strip()]
+ if title and title not in keywords:
+ keywords.append(title)
+ return keywords
+
+
+def create_chunks_for_source(source_url, source_name, doc_type, keywords_value):
+ if doc_type == "Ecosystem Dashboard":
+ return create_ecosystem_dashboard_chunk(source_url, source_name, keywords_value)
+
+ fetch_url = source_to_fetch_url(source_url)
+ response = fetch_with_logging(fetch_url)
+ if response is None:
+ print('not valid, ', fetch_url)
+ return []
+ parsed_document = parse_document_content(
+ source_url=normalize_source_url(source_url),
+ resolved_url=response.url,
+ response_content=response.content,
+ content_type=response.headers.get("content-type", ""),
+ fallback_title=source_name,
+ )
+ keywords = parse_keywords(keywords_value, source_name)
+ return [
+ createChunk(
+ text_snippet=payload["content"],
+ WEBSITE_url=payload["url"],
+ keywords=keywords,
+ title=payload["title"],
+ heading=payload["heading"],
+ heading_path=payload["heading_path"],
+ doc_type=payload["doc_type"],
+ product=payload["product"],
+ version=payload["version"],
+ resolved_url=payload["resolved_url"],
+ content_type=payload["content_type"],
+ )
+ for payload in chunk_parsed_document(parsed_document, doc_type=doc_type or "Documentation", keywords=keywords)
+ ]
+
+
def chunkSaveAndTrack(url,chunk):
def addNewRow(current_date,chunk_words,chunk_id):
@@ -828,7 +954,7 @@ def recordChunk():
def main():
-
+ skip_discovery = os.getenv("SKIP_DISCOVERY", "").lower() in {"1", "true", "yes"}
# Ensure intrinsic_chunks folder and files from S3 are present
ensure_intrinsic_chunks_from_s3()
@@ -853,17 +979,23 @@ def main():
# 0) Initialize files
os.makedirs(yaml_dir, exist_ok=True) # create if doesn't exist
- os.makedirs('info', exist_ok=True) # create if doesn't exist
+ details_dir = os.path.dirname(details_file)
+ if details_dir:
+ os.makedirs(details_dir, exist_ok=True)
+ for filename in os.listdir(yaml_dir):
+ if filename.startswith('chunk_') and filename.endswith('.yaml'):
+ os.remove(os.path.join(yaml_dir, filename))
with open(details_file, mode='w', newline='') as file:
writer = csv.writer(file)
writer.writerow(['URL','Date', 'Number of Words', 'Number of Chunks','Chunk IDs'])
# 0) Obtain full database information:
# a) Learning Paths & Install Guides
- createLearningPathChunks()
+ if not skip_discovery:
+ createLearningPathChunks(emit_chunks=False)
- # b) Ecosystem Dashboard
- createEcosystemDashboardChunks()
+ # b) Ecosystem Dashboard
+ createEcosystemDashboardChunks(emit_chunks=False)
# c) Intrinsics
#createIntrinsicsDatabaseChunks()
@@ -875,29 +1007,11 @@ def main():
for i in range(csv_length):
url = csv_dict['urls'][i]
source_name = csv_dict['source_names'][i]
+ doc_type = csv_dict['site_names'][i]
+ keywords_value = csv_dict['focus'][i]
- # 2) Translate a URL into all it's individual page URLs, if applicable, as their raw GitHub MD files --> https://raw.githubusercontent.com/ArmDeveloperEcosystem/arm-learning-paths/refs/heads/main/content/learning-paths/servers-and-cloud-computing/llama-cpu/llama-chatbot.md
- MARKDOWN_urls, WEBSITE_urls = getMarkdownGitHubURLsFromPage(url)
- for j in range(len(MARKDOWN_urls)):
- MARKDOWN_url = MARKDOWN_urls[j]
- WEBSITE_url = WEBSITE_urls[j]
-
- # 3) Extract markdown, skipping those that are 404ing
- if not URLIsValidCheck(MARKDOWN_url):
- print('not valid, ',MARKDOWN_url)
- continue
- markdown = obtainMarkdownContentFromGitHubMDFile(MARKDOWN_url)
-
- # 4) Get keywords (removing -)
- keywords = [source_name.replace(" - ", " ").replace(" ", ", ")]
-
- # 4) Get sized text snippets the markdown
- text_snippets = obtainTextSnippets__Markdown(markdown)
-
- # 5) Create chunks for each snippet by adding metadata
- for text_snippet in text_snippets:
- chunk = createChunk(text_snippet, WEBSITE_url, keywords, source_name)
- chunkSaveAndTrack(url,chunk)
+ for chunk in create_chunks_for_source(url, source_name, doc_type, keywords_value):
+ chunkSaveAndTrack(url, chunk)
# Save updated sources CSV with all discovered sources
save_sources_csv(sources_file)
@@ -906,4 +1020,4 @@ def main():
if __name__ == "__main__":
- main()
\ No newline at end of file
+ main()
diff --git a/embedding-generation/local_vectorstore_creation.py b/embedding-generation/local_vectorstore_creation.py
index 08f5899..f4afeda 100644
--- a/embedding-generation/local_vectorstore_creation.py
+++ b/embedding-generation/local_vectorstore_creation.py
@@ -19,22 +19,27 @@
import json
import os
import glob
-import sys
import datetime
from sentence_transformers import SentenceTransformer
from usearch.index import Index
+def sentence_transformer_cache_folder():
+ return os.getenv("SENTENCE_TRANSFORMERS_HOME") or None
+
+
def load_local_yaml_files() -> List[Dict]:
"""Load locally stored YAML files and return their contents as a list of dictionaries."""
print("Loading local YAML files")
yaml_contents = []
+ intrinsic_dir = os.getenv("INTRINSIC_CHUNKS_DIR", "intrinsic_chunks")
+ yaml_dir = os.getenv("YAML_DATA_DIR", "yaml_data")
- intrinsic_files = glob.glob(os.path.join("intrinsic_chunks", "*.yaml"))
- print(f"Found {len(intrinsic_files)} YAML files in intrinsic_chunks directory")
+ intrinsic_files = glob.glob(os.path.join(intrinsic_dir, "*.yaml"))
+ print(f"Found {len(intrinsic_files)} YAML files in {intrinsic_dir} directory")
- yaml_data_files = glob.glob(os.path.join("yaml_data", "*.yaml"))
- print(f"Found {len(yaml_data_files)} YAML files in yaml_data directory")
+ yaml_data_files = glob.glob(os.path.join(yaml_dir, "*.yaml"))
+ print(f"Found {len(yaml_data_files)} YAML files in {yaml_dir} directory")
# Combine all files
all_files = intrinsic_files + yaml_data_files
@@ -42,12 +47,13 @@ def load_local_yaml_files() -> List[Dict]:
print(f"Total files to process: {total_files}")
for i, file_path in enumerate(all_files, 1):
- print(f"Loading file {i}/{total_files}: {file_path}")
+ if i <= 10 or i % 1000 == 0 or i == total_files:
+ print(f"Loading file {i}/{total_files}: {file_path}")
# Extract chunk identifier based on file location
- if file_path.startswith("intrinsic_chunks"):
+ if os.path.normpath(file_path).startswith(os.path.normpath(intrinsic_dir)):
chunk_uuid = f"intrinsic_{os.path.basename(file_path).replace('.yaml', '')}"
- elif file_path.startswith("yaml_data"):
+ elif os.path.normpath(file_path).startswith(os.path.normpath(yaml_dir)):
chunk_uuid = f"yaml_data_{os.path.basename(file_path).replace('.yaml', '')}"
else:
chunk_uuid = file_path.replace('chunk_', '').replace('.yaml', '')
@@ -68,7 +74,11 @@ def load_local_yaml_files() -> List[Dict]:
def create_embeddings(contents: List[str], model_name: str = 'all-MiniLM-L6-v2') -> np.ndarray:
"""Create embeddings for the given contents using SentenceTransformers."""
print(f"Creating embeddings using model: {model_name}")
- model = SentenceTransformer(model_name)
+ model = SentenceTransformer(
+ model_name,
+ cache_folder=sentence_transformer_cache_folder(),
+ local_files_only=True,
+ )
embeddings = model.encode(contents, show_progress_bar=True, convert_to_numpy=True)
print(f"Created embeddings with shape: {embeddings.shape}")
return embeddings
@@ -96,9 +106,6 @@ def create_usearch_index(embeddings: np.ndarray, metadata: List[Dict]) -> Tuple[
print(f"Adding {num_vectors} vectors to the index")
for i, embedding in enumerate(embeddings):
index.add(i, embedding)
-
- for item, vec in zip(metadata, embeddings):
- item['vector'] = vec.tolist()
print(f"Added {len(index)} vectors to the index")
return index, metadata
@@ -115,15 +122,39 @@ def main():
contents = []
metadata = []
for i, yaml_content in enumerate(yaml_contents, 1):
- print(f"Processing YAML content {i}/{len(yaml_contents)}")
+ if i <= 10 or i % 1000 == 0 or i == len(yaml_contents):
+ print(f"Processing YAML content {i}/{len(yaml_contents)}")
contents.append(yaml_content['content'])
+ heading_path = yaml_content.get('heading_path', []) or []
+ search_text = " ".join(
+ str(value)
+ for value in [
+ yaml_content.get('title', ''),
+ " ".join(heading_path),
+ yaml_content.get('heading', ''),
+ yaml_content.get('doc_type', ''),
+ yaml_content.get('product', ''),
+ yaml_content.get('version', ''),
+ yaml_content.get('keywords', ''),
+ yaml_content.get('content', ''),
+ ]
+ if value
+ )
metadata.append({
'uuid': yaml_content['uuid'],
'url': yaml_content['url'],
+ 'resolved_url': yaml_content.get('resolved_url', yaml_content['url']),
'original_text': yaml_content['content'],
'title': yaml_content['title'],
'keywords': yaml_content['keywords'],
- 'chunk_uuid': yaml_content['chunk_uuid']
+ 'chunk_uuid': yaml_content['chunk_uuid'],
+ 'heading': yaml_content.get('heading', ''),
+ 'heading_path': heading_path,
+ 'doc_type': yaml_content.get('doc_type', ''),
+ 'product': yaml_content.get('product', ''),
+ 'version': yaml_content.get('version', ''),
+ 'content_type': yaml_content.get('content_type', ''),
+ 'search_text': search_text,
})
# Create embeddings
@@ -139,12 +170,12 @@ def main():
index, metadata = create_usearch_index(embeddings, metadata)
# Save the USearch index
- index_filename = 'usearch_index.bin'
+ index_filename = os.getenv('USEARCH_INDEX_FILENAME', 'usearch_index.bin')
print(f"Saving USearch index to {index_filename}")
index.save(index_filename)
# Save metadata
- metadata_filename = 'metadata.json'
+ metadata_filename = os.getenv('METADATA_FILENAME', 'metadata.json')
print(f"Saving metadata to {metadata_filename}")
with open(metadata_filename, 'w') as f:
json.dump(metadata, f, indent=2)
@@ -155,4 +186,4 @@ def main():
print(f"Metadata saved to: {os.path.abspath(metadata_filename)}")
if __name__ == "__main__":
- main()
\ No newline at end of file
+ main()
diff --git a/embedding-generation/requirements.txt b/embedding-generation/requirements.txt
index fc8fd7b..f6846d7 100644
--- a/embedding-generation/requirements.txt
+++ b/embedding-generation/requirements.txt
@@ -3,4 +3,6 @@ beautifulsoup4
pyyaml
usearch
boto3
-sentence-transformers
\ No newline at end of file
+sentence-transformers
+pypdf
+rank-bm25
diff --git a/embedding-generation/tests/test_generate_chunks.py b/embedding-generation/tests/test_generate_chunks.py
index 96ecfa1..86f8b35 100644
--- a/embedding-generation/tests/test_generate_chunks.py
+++ b/embedding-generation/tests/test_generate_chunks.py
@@ -73,18 +73,30 @@ def test_chunk_to_dict(self, gc):
url="https://example.com",
uuid="test-uuid",
keywords=["key1", "key2"],
- content="Test content"
+ content="Test content",
+ heading="Install",
+ heading_path=["Guide", "Install"],
+ doc_type="Tutorial",
+ product="Ampere",
+ version="2025",
+ resolved_url="https://example.com/resolved",
+ content_type="markdown",
)
result = chunk.toDict()
- assert result == {
- 'title': "Test Title",
- 'url': "https://example.com",
- 'uuid': "test-uuid",
- 'keywords': "key1, key2",
- 'content': "Test content"
- }
+ assert result["title"] == "Test Title"
+ assert result["url"] == "https://example.com"
+ assert result["uuid"] == "test-uuid"
+ assert result["keywords"] == "key1, key2"
+ assert result["content"] == "Test content"
+ assert result["heading"] == "Install"
+ assert result["heading_path"] == ["Guide", "Install"]
+ assert result["doc_type"] == "Tutorial"
+ assert result["product"] == "Ampere"
+ assert result["version"] == "2025"
+ assert result["resolved_url"] == "https://example.com/resolved"
+ assert result["content_type"] == "markdown"
def test_chunk_empty_keywords(self, gc):
"""Test Chunk with empty keywords list."""
@@ -371,6 +383,43 @@ def test_respects_max_words(self, gc):
# With headers, content should be split into multiple chunks
assert len(chunks) >= 2
+ def test_prepends_document_title_and_heading_path(self, gc):
+ """Structured chunks should carry the document title and heading path prefix."""
+ content = """
+# Deployment Guide
+
+## Install
+""" + "word " * 350
+
+ chunks = gc.obtainTextSnippets__Markdown(content, min_words=150, max_words=400)
+
+ assert len(chunks) >= 1
+ assert chunks[0].startswith("Document Title: Deployment Guide")
+ assert "Heading Path: Install" in chunks[0]
+
+ def test_keeps_code_with_neighboring_explanation(self, gc):
+ """Code blocks should remain grouped with nearby explanatory text."""
+ content = """
+# Example Guide
+
+## Build
+First install dependencies and verify the environment is ready for compilation.
+
+```bash
+make build
+make test
+```
+
+Use the generated binary to verify the expected output and continue with setup.
+""" + ("\n\nAdditional context. " * 120)
+
+ chunks = gc.obtainTextSnippets__Markdown(content, min_words=100, max_words=250)
+
+ matching = [chunk for chunk in chunks if "make build" in chunk]
+ assert matching
+ assert "First install dependencies" in matching[0]
+ assert "Use the generated binary" in matching[0]
+
class TestReadInCSV:
"""Tests for readInCSV function."""
@@ -390,6 +439,8 @@ def test_read_csv_basic(self, gc, tmp_path):
assert csv_dict['urls'] == ['https://example.com/1', 'https://example.com/2']
assert csv_dict['source_names'] == ['Display1', 'Display2']
assert csv_dict['focus'] == ['key1', 'key2']
+ assert csv_dict['site_names'] == ['Site1', 'Site2']
+ assert csv_dict['license_types'] == ['MIT', 'Apache']
def test_read_csv_empty(self, gc, tmp_path):
"""Test reading an empty CSV (header only)."""
diff --git a/embedding-generation/vector-db-sources.csv b/embedding-generation/vector-db-sources.csv
index a219193..930c53d 100755
--- a/embedding-generation/vector-db-sources.csv
+++ b/embedding-generation/vector-db-sources.csv
@@ -1672,3 +1672,91 @@ Ecosystem Dashboard,Arm Proprietary,Ecosystem Dashboard - Zookeeper,https://www.
Ecosystem Dashboard,Arm Proprietary,Ecosystem Dashboard - Zstandard,https://www.arm.com/developer-hub/ecosystem-dashboard/?package=zstandard,Zstandard; open-source; compression; database
Ecosystem Dashboard,Arm Proprietary,Ecosystem Dashboard - Zulip,https://www.arm.com/developer-hub/ecosystem-dashboard/?package=zulip,Zulip; open-source; messaging__comms; cloud-native
Ecosystem Dashboard,Arm Proprietary,Ecosystem Dashboard - Zulu OpenJDK (Azul Systems),https://www.arm.com/developer-hub/ecosystem-dashboard/?package=zulu-openjdk-azul-systems,Zulu OpenJDK (Azul Systems); open-source; runtimes; languages
+Reference Architecture,,Deploying an ELK stack,https://amperecomputing.com/reference-architecture/deploying-an-elk-stack-on-google-tau-t2a,
+Tuning Guide,,DPDK Cryptography Build and Tuning Guide,https://amperecomputing.com/tuning-guides/dpdk-cryptography-build-and-tuning-guide,
+Tuning Guide,,DPDK Setup and Tuning Guide - Refresh,https://amperecomputing.com/tuning-guides/DPDK-setup-and-tuning-guide,
+Tuning Guide,,Hadoop Tuning Guide,https://amperecomputing.com/tuning-guides/hadoop-tuning-guide-on-bare-metal,
+Tuning Guide,,Kafka Tuning Guide,https://amperecomputing.com/tuning-guides/apache-kafka-tuning-guide,
+Tuning Guide,,Locking primitives and memory ordering on Altra,https://amperecomputing.com/tuning-guides/locking-primitives-and-memory-ordering,
+Tuning Guide,,Memcached Tuning Guide,https://amperecomputing.com/tuning-guides/memcached-tuning-guide,
+Tuning Guide,,MongoDB Tuning Guide,https://amperecomputing.com/tuning-guides/mongoDB-tuning-guide,
+Tuning Guide,,MySQL Tuning Guide,https://amperecomputing.com/tuning-guides/mysql-tuning-guide,
+Tuning Guide,,NGINX Tuning Guide,https://amperecomputing.com/tuning-guides/nginx-tuning-guide,
+Tuning Guide,,PostgreSQL Tuning Guide for Ampere Altra Processors on Oracle Cloud Infrastructure,https://amperecomputing.com/tuning-guides/postgreSQL-tuning-guide,
+Tuning Guide,,Redis Tuning Guide,https://amperecomputing.com/tuning-guides/Redis-setup-and-tuning-guide,
+Tuning Guide,,Tuning guide for video codecs,https://amperecomputing.com/tuning-guides/FFmpeg-Tuning-Guide,
+Tuning Guide,,Unlocking Java Performance on Ampere® Altra® Family Processors,https://amperecomputing.com/tuning-guides/unlocking-java-performance-tuning-guide,
+Tutorial,,Accelerating the Cloud Part 1: Going Cloud Native,https://amperecomputing.com/guides/accelerating-the-cloud/going-cloud-native,
+Tutorial,,Accelerating the Cloud Part 2: The Investment to Go Cloud Native,https://amperecomputing.com/guides/accelerating-the-cloud/The-Investment-to-Go-Cloud-Native,
+Tutorial,,Accelerating the Cloud Part 3: Redeployment Pre-Flight Checklist,https://amperecomputing.com/guides/accelerating-the-cloud/Transitioning-to-Cloud-Native-Pre-Flight-Checklist,
+Tutorial,,Accelerating the Cloud Part 4: What to Expect When Going Cloud Native,https://amperecomputing.com/guides/accelerating-the-cloud/What-to-Expect-When-Going-Cloud-Native,
+Tutorial,,Accelerating the Cloud Part 5: The Final Step,https://amperecomputing.com/guides/accelerating-the-cloud/the-final-steps,
+Tutorial,,Ampere AI,https://amperecomputing.com/solutions/ampere-ai,
+Tutorial,,Ampere AI Optimized Frameworks,https://uawartifacts.blob.core.windows.net/upload-files/Ampere_AI_Optimized_Frameworks_92851db62e.pdf?updated_at=2022-10-04T16:55:44.090Z,
+Tutorial,,Ampere Porting Advisor Tutorial,https://amperecomputing.com/tutorials/porting-advisor,
+Tutorial,,Arm Native,https://amperecomputing.com/solutions/arm-native,
+Tutorial,,Big Data Solutions,https://amperecomputing.com/solutions/big-data,
+Tutorial,,Ceph on Ampere Processors,https://uawartifacts.blob.core.windows.net/upload-files/Ampere_Arm_Processors_for_Ceph_WP_v1_00_20230222_1_fcd19200fb.pdf?updated_at=2023-03-13T18:10:32.078Z,
+Tutorial,,Cloud Native Solutions,https://amperecomputing.com/solutions/cloud-native,
+Tutorial,,Cryptography Library on Ampere Tutorial,https://amperecomputing.com/tutorials/cryptography,
+Tutorial,,FP16 vs Fp32 Data Formats,https://uawartifacts.blob.core.windows.net/upload-files/Fp16_vs_Fp32_Data_Formats_b2bac45bf0.pdf?updated_at=2022-10-04T16:55:45.159Z,
+Tutorial,,GCC Guide for Ampere Processors 2025 - updated,https://amperecomputing.com/tutorials/gcc-guide-ampere-processors,
+Tutorial,,Getting Cloud-Native with FreeBSD in OCI with Ampere A1 and Terraform,https://amperecomputing.com/blogs/getting-cloud-native-with-freebsd-on-oci-ampere-a1-with-terraform-,
+Tutorial,,Getting started on Azure Ampere VMs with Debian using Terraform,https://amperecomputing.com/tutorials/getting-started-on-azure-ampere-VMs-with-Debian-using-Terraform,
+Tutorial,,Getting started on Azure Ampere VMs with Opensuse using Terraform,https://amperecomputing.com/tutorials/getting-started-on-azure-ampere-vms-with-opensuse-using-terraform,
+Tutorial,,Improving the Performance of Atomic Instructions for Ampere,https://amperecomputing.com/tutorials/fixing-page-fault-performance-issue,
+Tutorial,,Introducing Almalinux 9 in OCI using Ampere 1 and Terraform,https://amperecomputing.com/blogs/introducing-almalinux-9-on-oci-ampere-a1-with-terraform,
+Tutorial,,Introducing OpenMandriva in OCI using Ampere A1 and Terraform,https://amperecomputing.com/blogs/introducing-openmandriva-on-oci-ampere-a1-with-terraform-,
+Tutorial,,Memory Page Sizes,https://amperecomputing.com/tuning-guides/understanding-memory-page-sizes-on-arm64,
+Tutorial,,On demand build infrastructure in OCI using Ampere A1 and Terraform,https://amperecomputing.com/blogs/on-demand-build-infrastructure-on-oci-ampere-a1-with-terraform,
+Tutorial,,Optimizing the JVM for Ampere part 1,https://amperecomputing.com/tutorials/optimizing-java-applications-for-arm64-in-the-cloud,
+Tutorial,,The First 10 Questions to Answer while running on Ampere Altra-based Instances,https://amperecomputing.com/tutorials/the-first-10-questions-to-answer-while-running-on-ampere-altra-based-instances,
+Tutorial,,Web Services Reference Architecture,chrome-extension://efaidnbmnnnibpcajpcglclefindmkaj/https:/uawartifacts.blob.core.windows.net/upload-files/Web_Services_Efficiency_Reference_Architecture_v1_00_20230510_2d10554b8a.pdf?updated_at=2023-05-10T15:10:19.861Z,
+Workload Brief,,AI Inference on Azure Dpsv5 instances,https://amperecomputing.com/briefs/ai-inference-on-azure-brief,
+Workload Brief,,AmpereOne vBench on Bare Metal,https://amperecomputing.com/briefs/x264-on-ampereone-brief,
+Workload Brief,,AmpereOne: DLRM (torchbench) on Bare Metal,https://amperecomputing.com/briefs/recommender-engine-ai-inference-on-ampereone,
+Workload Brief,,AmpereOne: Llama-3 on Bare Metal,https://amperecomputing.com/briefs/llama-3-ai-inference-on-ampereone,
+Workload Brief,,AmpereOne: Memcached on Bare Metal,https://amperecomputing.com/briefs/memcached-on-ampereone,
+Workload Brief,,AmpereOne: MySQL on Bare Metal,https://amperecomputing.com/briefs/mysql-on-ampereone,
+Workload Brief,,AmpereOne: NGINX on Bare Metal,https://amperecomputing.com/briefs/nginx-on-AC04-brief,
+Workload Brief,,AmpereOne: PostgreSQL on Bare Metal,https://amperecomputing.com/briefs/postgresql-on-ampereone,
+Workload Brief,,AmpereOne: Redis on Bare Metal,https://amperecomputing.com/briefs/redis-on-AC04-brief,
+Workload Brief,,Canonical Anbox Cloud Brief,https://amperecomputing.com/briefs/anbox_solution_brief,
+Workload Brief,,Cassandra on Azure,https://amperecomputing.com/briefs/cassandra-on-azure-brief,
+Workload Brief,,Cassandra on Bare Metal,https://amperecomputing.com/briefs/cassandra-workload-brief,
+Workload Brief,,Cassandra on Google Cloud,https://amperecomputing.com/briefs/cassandra-on-google-cloud-brief,
+Workload Brief,,DSB Social Network Brief on Bare Metal,https://amperecomputing.com/briefs/dsb-sn-brief,
+Workload Brief,,DSB Social Network on OCI Brief,https://amperecomputing.com/briefs/dsb-social-network-scale-out-brief,
+Workload Brief,,ElasticSearch on Azure Workload Brief,https://amperecomputing.com/briefs/elasticsearch-on-azure-brief,
+Workload Brief,,ElasticSearch on OCI Workload Brief,https://amperecomputing.com/briefs/elasticsearch-oci-brief,
+Workload Brief,,Hadoop Brief,https://amperecomputing.com/briefs/hadoop-workload-brief,
+Workload Brief,,Hadoop on OCI Workload Brief,https://amperecomputing.com/briefs/hadoop-on-oci-brief,
+Workload Brief,,Kafka Workload Brief,https://amperecomputing.com/briefs/apache-kafka-solution-brief,
+Workload Brief,,Kafka on Azure Brief,https://amperecomputing.com/briefs/kafka-on-azure-brief,
+Workload Brief,,Memcached on Azure,https://amperecomputing.com/briefs/memcached-on-azure-brief,
+Workload Brief,,Memcached on Bare Metal,https://amperecomputing.com/briefs/memcached-workload-brief,
+Workload Brief,,MongoDB Workload Brief on Bare Metal,https://amperecomputing.com/briefs/mongodb-brief,
+Workload Brief,,MySQL on Bare Metal Workload Brief,https://amperecomputing.com/briefs/mysqlserver_workload_brief,
+Workload Brief,,NGINX on Azure Workload Brief - Updated replacement,https://amperecomputing.com/briefs/nginx-on-azure-brief,
+Workload Brief,,NGINX on Bare Metal Workload Brief,https://amperecomputing.com/briefs/nginx-workload-brief,
+Workload Brief,,NGINX on Google Cloud Workload Brief,https://amperecomputing.com/briefs/nginx-on-google-cloud-brief,
+Workload Brief,,Object Storage MinIO Single Node,https://www.amperecomputing.com/briefs/minio-on-single-node-brief,
+Workload Brief,,Redis on Azure Workload Brief - updated replacement,https://amperecomputing.com/briefs/redis-on-azure-brief,
+Workload Brief,,Redis on Bare Metal Workload Brief,https://amperecomputing.com/briefs/redis-workload-brief,
+Workload Brief,,Redis on Google Cloud Workload Brief,https://amperecomputing.com/briefs/redis-on-google-brief,
+Workload Brief,,Spark on OCI Workload Brief,https://amperecomputing.com/briefs/spark-on-OCI-brief,
+Workload Brief,,Spark on Google Cloud Brief,https://amperecomputing.com/briefs/spark-on-google-brief,
+Workload Brief,,Spark on Azure Brief,https://amperecomputing.com/briefs/spark-on-azure-brief,
+Workload Brief,,Spark Workload Brief,https://amperecomputing.com/briefs/spark-workload-brief,
+Workload Brief,,VP9 Video Codec on Google Cloud Workload Brief,https://amperecomputing.com/briefs/vp9-on-google-brief,
+Workload Brief,,x264 on Azure Workload Brief,https://amperecomputing.com/briefs/x264-on-azure-brief,
+Workload Brief,,x264 on Bare Metal Workload Brief,https://amperecomputing.com/briefs/x264_workload_brief,
+Workload Brief,,x264 on Google Cloud Workload Brief,https://amperecomputing.com/briefs/x264-on-google-cloud-brief,
+Workload Brief,,x265 on Azure Workload Brief,https://amperecomputing.com/briefs/x265-on-azure-brief,
+Workload Brief,,x265 on Bare Metal Workload Brief,https://amperecomputing.com/briefs/x265-workload-brief,
+Workload Brief,,x265 on Google Cloud Workload Brief,https://amperecomputing.com/briefs/x265-on-google-cloud-brief,
+Learning Paths,CC4.0,Learning Path - Monitor Azure Cobalt 100 Arm64 virtual machines using Dynatrace OneAgent,https://learn.arm.com/learning-paths/servers-and-cloud-computing/dynatrace-azure/,Containers and Virtualization; Microsoft Azure; Linux; Dynatrace; NGINX; ActiveGate
+Learning Paths,CC4.0,Learning Path - Build Robot Simulation and Reinforcement Learning Workflows with Isaac Sim and Isaac Lab on DGX Spark,https://learn.arm.com/learning-paths/laptops-and-desktops/dgx_spark_isaac_robotics/,ML; Linux; Python; Bash; IsaacSim; IsaacLab
+Learning Paths,CC4.0,Learning Path - Build a customer support chatbot on Android with Llama and ExecuTorch,https://learn.arm.com/learning-paths/mobile-graphics-and-gaming/customer-support-chatbot-with-llama-and-executorch-on-arm-based-mobile-devices/,ML; macOS; Linux; Android; Java; Python; ExecuTorch
+Learning Paths,CC4.0,Learning Path - Run image classification on an Alif Ensemble E8 DevKit using ExecuTorch and Ethos-U85,https://learn.arm.com/learning-paths/embedded-and-microcontrollers/alif-image-classification/,ML; Baremetal; ExecuTorch; PyTorch; GCC; CMSIS-Toolbox; Python
+Learning Paths,CC4.0,Learning Path - Deploy ExecuTorch firmware on NXP FRDM i.MX 93 for Ethos-U65 acceleration,https://learn.arm.com/learning-paths/embedded-and-microcontrollers/observing-ethos-u-on-nxp/,ML; Linux; macOS; Baremetal; Python; PyTorch; ExecuTorch; Arm Compute Library; GCC
diff --git a/mcp-local/Dockerfile b/mcp-local/Dockerfile
index d371376..811f982 100644
--- a/mcp-local/Dockerfile
+++ b/mcp-local/Dockerfile
@@ -15,16 +15,21 @@
# syntax=docker/dockerfile:1.7
ARG EMBEDDINGS_IMAGE=armlimited/arm-mcp:embeddings-latest
+ARG EMBEDDING_MODEL=all-MiniLM-L6-v2
# EMBEDDINGS_IMAGE must point to an embeddings image tag (e.g., armlimited/arm-mcp:embeddings-YYYY-MM-DD).
FROM --platform=linux/arm64 ${EMBEDDINGS_IMAGE} AS embeddings
# Stage 1: Build main application with prebuilt vector database
FROM ubuntu:24.04 AS builder
+ARG EMBEDDING_MODEL=all-MiniLM-L6-v2
ENV DEBIAN_FRONTEND=noninteractive \
PYTHONUNBUFFERED=1 \
PIP_NO_CACHE_DIR=1 \
- WORKSPACE_DIR=/workspace
+ WORKSPACE_DIR=/workspace \
+ HF_HOME=/app/.cache/huggingface \
+ SENTENCE_TRANSFORMERS_HOME=/app/.cache/sentence_transformers \
+ SENTENCE_TRANSFORMER_MODEL=${EMBEDDING_MODEL}
RUN apt-get update && apt-get install -y --no-install-recommends \
python3 python3-venv python3-pip \
@@ -65,6 +70,9 @@ RUN if [ "$TARGETPLATFORM" = "linux/amd64" ]; then \
pip install --no-cache-dir -r requirements.txt; \
fi
+RUN mkdir -p "$HF_HOME" "$SENTENCE_TRANSFORMERS_HOME" && \
+ python -c "from sentence_transformers import SentenceTransformer; import os; SentenceTransformer(os.environ['SENTENCE_TRANSFORMER_MODEL'], cache_folder=os.environ['SENTENCE_TRANSFORMERS_HOME'])"
+
# Copy generated vector database files
RUN mkdir -p ./data
COPY --from=embeddings /embedding-data/metadata.json ./data/metadata.json
@@ -82,6 +90,8 @@ ENV DEBIAN_FRONTEND=noninteractive \
PYTHONUNBUFFERED=1 \
PIP_NO_CACHE_DIR=1 \
WORKSPACE_DIR=/workspace \
+ HF_HOME=/app/.cache/huggingface \
+ SENTENCE_TRANSFORMERS_HOME=/app/.cache/sentence_transformers \
VIRTUAL_ENV=/app/.venv \
PATH=/app/.venv/bin:$PATH
diff --git a/mcp-local/requirements.txt b/mcp-local/requirements.txt
index bcad0c8..8023a60 100644
--- a/mcp-local/requirements.txt
+++ b/mcp-local/requirements.txt
@@ -5,4 +5,5 @@ boto3
requests
mcp
sentence-transformers
-fastmcp
\ No newline at end of file
+fastmcp
+rank-bm25
diff --git a/mcp-local/server.py b/mcp-local/server.py
index be1d1b1..095d118 100644
--- a/mcp-local/server.py
+++ b/mcp-local/server.py
@@ -14,9 +14,10 @@
from fastmcp import FastMCP
from typing import List, Dict, Any, Optional
+import os
from sentence_transformers import SentenceTransformer
from utils.config import METADATA_PATH, USEARCH_INDEX_PATH, MODEL_NAME, SUPPORTED_SCANNERS, DEFAULT_ARCH
-from utils.search_utils import load_metadata, load_usearch_index, embedding_search, deduplicate_urls
+from utils.search_utils import build_bm25_index, deduplicate_urls, hybrid_search, load_metadata, load_usearch_index
from utils.docker_utils import check_docker_image_architectures
from utils.migrate_ease_utils import run_migrate_ease_scan
from utils.skopeo_tool import skopeo_help, skopeo_inspect
@@ -27,10 +28,35 @@
# Initialize the MCP server
mcp = FastMCP("arm-mcp")
+
+def sentence_transformer_cache_folder() -> str | None:
+ return os.getenv("SENTENCE_TRANSFORMERS_HOME") or None
+
+
+def load_embedding_model() -> SentenceTransformer:
+ try:
+ return SentenceTransformer(
+ MODEL_NAME,
+ cache_folder=sentence_transformer_cache_folder(),
+ local_files_only=True,
+ )
+ except Exception as exc:
+ print(f"Local cache miss for embedding model '{MODEL_NAME}', retrying with network access: {exc}")
+ return SentenceTransformer(
+ MODEL_NAME,
+ cache_folder=sentence_transformer_cache_folder(),
+ local_files_only=False,
+ )
+
+
# Load USearch index and metadata at module load time
METADATA = load_metadata(METADATA_PATH)
-USEARCH_INDEX = load_usearch_index(USEARCH_INDEX_PATH, METADATA)
-EMBEDDING_MODEL = SentenceTransformer(MODEL_NAME)
+EMBEDDING_MODEL = load_embedding_model()
+USEARCH_INDEX = load_usearch_index(
+ USEARCH_INDEX_PATH,
+ EMBEDDING_MODEL.get_sentence_embedding_dimension(),
+)
+BM25_INDEX = build_bm25_index(METADATA)
# error formatter now lives in utils/error_handling.py
@@ -56,15 +82,19 @@ def knowledge_base_search(query: str, invocation_reason: Optional[str] = None) -
List of dictionaries with metadata including url and text snippets.
"""
try:
- embedding_results = embedding_search(query, USEARCH_INDEX, METADATA, EMBEDDING_MODEL)
- deduped = deduplicate_urls(embedding_results)
+ search_results = hybrid_search(query, USEARCH_INDEX, METADATA, EMBEDDING_MODEL, BM25_INDEX)
+ deduped = deduplicate_urls(search_results)
# Only return the relevant fields
formatted = [
{
"url": item["metadata"].get("url"),
"snippet": item["metadata"].get("original_text", item["metadata"].get("content", "")),
"title": item["metadata"].get("title", ""),
- "distance": item.get("distance")
+ "heading": item["metadata"].get("heading", ""),
+ "doc_type": item["metadata"].get("doc_type", ""),
+ "product": item["metadata"].get("product", ""),
+ "distance": item.get("distance"),
+ "score": item.get("rerank_score", item.get("rrf_score")),
}
for item in deduped
]
diff --git a/mcp-local/utils/search_utils.py b/mcp-local/utils/search_utils.py
index bac7fd6..71acc25 100644
--- a/mcp-local/utils/search_utils.py
+++ b/mcp-local/utils/search_utils.py
@@ -12,37 +12,67 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from typing import List, Dict, Any
-from usearch.index import Index
+from typing import Any, Dict, List, Optional
import json
+import os
+import re
+
import numpy as np
+from rank_bm25 import BM25Okapi
from sentence_transformers import SentenceTransformer
-from .config import USEARCH_INDEX_PATH, METADATA_PATH, MODEL_NAME, DISTANCE_THRESHOLD, K_RESULTS
-import os
+from usearch.index import Index
+
+from .config import DISTANCE_THRESHOLD, K_RESULTS
+
+
+SEARCH_TOKEN_PATTERN = re.compile(r"[a-z0-9][a-z0-9_\-+.]*", re.IGNORECASE)
+RRF_K = 60
+SEARCH_STOPWORDS = {
+ "a", "an", "and", "are", "be", "better", "can", "configured", "configuration", "for",
+ "called", "how", "i", "improve", "in", "is", "it", "of", "on", "or", "out", "performance", "processor",
+ "processors", "recommended", "settings", "should", "step", "steps", "system", "systems",
+ "the", "to", "use", "what", "which", "with", "ampere", "arm", "benchmark", "benchmarking",
+ "benchmarked", "benchmarks", "brief", "cloud", "config", "configure", "guide", "options",
+ "performance", "processor", "processors", "reference", "setup", "tutorial", "tune",
+ "tuned", "tuning",
+}
+TUNING_INTENT_TOKENS = {
+ "benchmark", "benchmarking", "benchmarked", "benchmarks", "config", "configure",
+ "configured", "configuration", "latency", "oltp", "optimize", "optimized", "performance",
+ "throughput", "tune", "tuned", "tuning",
+}
+REFERENCE_ARCHITECTURE_INTENT_TOKENS = {
+ "architecture", "deploy", "deployment", "reference", "steps",
+}
+TUTORIAL_INTENT_TOKENS = {
+ "how", "install", "migration", "migrate", "port", "porting", "setup", "tutorial",
+}
+
+def tokenize_for_search(text: str) -> List[str]:
+ return [token.lower() for token in SEARCH_TOKEN_PATTERN.findall(text or "")]
-def load_usearch_index(index_path: str, metadata: List[Dict]) -> Index:
+
+def salient_tokens(text: str) -> List[str]:
+ return [token for token in tokenize_for_search(text) if token not in SEARCH_STOPWORDS]
+
+
+def load_usearch_index(index_path: str, dimension: int) -> Optional[Index]:
"""Load USearch index from file."""
if not os.path.exists(index_path):
print(f"Error: USearch index file '{index_path}' does not exist.")
return None
- if not metadata:
- print("Error: Knowledge base metadata is missing or invalid.")
+ if dimension <= 0:
+ print("Error: Invalid embedding dimension.")
return None
- # Get dimension from the first metadata entry's vector
- dimension = len(metadata[0]['vector'])
-
- # Create index with same parameters as used during creation
index = Index(
ndim=dimension,
- metric='l2sq', # L2 squared distance
- dtype='f32',
+ metric="l2sq",
+ dtype="f32",
connectivity=16,
expansion_add=128,
- expansion_search=64
+ expansion_search=64,
)
-
- # Load the saved index
index.load(index_path)
return index
@@ -52,72 +82,190 @@ def load_metadata(metadata_path: str) -> List[Dict]:
if not os.path.exists(metadata_path):
print(f"Error: Metadata file '{metadata_path}' does not exist.")
return []
- with open(metadata_path, 'r') as f:
- metadata = json.load(f)
- return metadata
+ with open(metadata_path, "r") as file:
+ return json.load(file)
+
+
+def build_bm25_index(metadata: List[Dict]) -> Optional[BM25Okapi]:
+ corpus = [tokenize_for_search(item.get("search_text", "")) for item in metadata]
+ if not any(corpus):
+ return None
+ return BM25Okapi(corpus)
def embedding_search(
- query: str,
- usearch_index: Index,
- metadata: List[Dict],
+ query: str,
+ usearch_index: Optional[Index],
+ metadata: List[Dict],
embedding_model: SentenceTransformer,
- k: int = K_RESULTS
+ k: int = K_RESULTS,
) -> List[Dict[str, Any]]:
"""Search the USearch index with a text query."""
- # Create query embedding
+ if usearch_index is None:
+ return []
query_embedding = embedding_model.encode([query])[0]
-
- # Search in USearch index
matches = usearch_index.search(query_embedding, k)
- results = []
- # Robust handling of USearch Matches object, as in test_vectorstore.py
- if matches is not None:
- try:
- # USearch Matches object can be accessed with .keys and .distances properties
- if hasattr(matches, 'keys') and hasattr(matches, 'distances'):
- labels = matches.keys
- distances = matches.distances
- # Alternative attribute names
- elif hasattr(matches, 'labels') and hasattr(matches, 'distances'):
- labels = matches.labels
- distances = matches.distances
- # Try converting to numpy arrays
- else:
- labels = np.array(matches.keys) if hasattr(matches, 'keys') else None
- distances = np.array(matches.distances) if hasattr(matches, 'distances') else None
- # If tuple (labels, distances)
- if labels is None or distances is None:
- if isinstance(matches, tuple) and len(matches) == 2:
- labels, distances = matches
- elif isinstance(matches, dict):
- labels = matches.get('labels', matches.get('indices'))
- distances = matches.get('distances')
- if labels is not None and distances is not None:
- labels = np.atleast_1d(labels)
- distances = np.atleast_1d(distances)
- for i, (idx, dist) in enumerate(zip(labels, distances)):
- if idx != -1 and float(dist) < DISTANCE_THRESHOLD:
- result = {
- "rank": i + 1,
- "distance": float(dist),
- "metadata": metadata[int(idx)]
- }
- results.append(result)
- except Exception as e:
- print(f"Error processing matches: {e}")
- import traceback
- traceback.print_exc()
+ results: List[Dict[str, Any]] = []
+ if matches is None:
+ return results
+
+ try:
+ labels = getattr(matches, "keys", None)
+ distances = getattr(matches, "distances", None)
+ if labels is None or distances is None:
+ if isinstance(matches, tuple) and len(matches) == 2:
+ labels, distances = matches
+ elif isinstance(matches, dict):
+ labels = matches.get("labels", matches.get("indices"))
+ distances = matches.get("distances")
+ if labels is None or distances is None:
+ return results
+
+ labels = np.atleast_1d(labels)
+ distances = np.atleast_1d(distances)
+ for rank, (idx, dist) in enumerate(zip(labels, distances), start=1):
+ if idx == -1:
+ continue
+ distance = float(dist)
+ if distance < DISTANCE_THRESHOLD:
+ results.append(
+ {
+ "rank": rank,
+ "distance": distance,
+ "metadata": metadata[int(idx)],
+ }
+ )
+ except Exception as exc:
+ print(f"Error processing dense matches: {exc}")
return results
-def deduplicate_urls(embedding_results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
- """Deduplicate metadata based on the 'url' field."""
- seen_urls = set()
+def bm25_search(
+ query: str,
+ metadata: List[Dict],
+ bm25_index: Optional[BM25Okapi],
+ k: int = K_RESULTS,
+) -> List[Dict[str, Any]]:
+ if bm25_index is None:
+ return []
+ tokens = tokenize_for_search(query)
+ if not tokens:
+ return []
+ scores = bm25_index.get_scores(tokens)
+ ranking = np.argsort(scores)[::-1]
+ results: List[Dict[str, Any]] = []
+ for rank, idx in enumerate(ranking[:k], start=1):
+ score = float(scores[idx])
+ if score <= 0:
+ continue
+ results.append(
+ {
+ "rank": rank,
+ "bm25_score": score,
+ "metadata": metadata[int(idx)],
+ }
+ )
+ return results
+
+
+def rerank_candidates(query: str, candidates: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
+ query_tokens = set(tokenize_for_search(query))
+ if not query_tokens:
+ return candidates
+ salient_query_tokens = set(salient_tokens(query))
+ prefers_tuning_guide = bool(query_tokens & TUNING_INTENT_TOKENS)
+ prefers_reference_architecture = bool(query_tokens & REFERENCE_ARCHITECTURE_INTENT_TOKENS)
+ prefers_tutorial = bool(query_tokens & TUTORIAL_INTENT_TOKENS)
+
+ reranked: List[Dict[str, Any]] = []
+ for candidate in candidates:
+ metadata = candidate["metadata"]
+ full_text_tokens = set(tokenize_for_search(metadata.get("search_text", "")))
+ title_tokens = set(tokenize_for_search(metadata.get("title", "")))
+ heading_tokens = set(tokenize_for_search(" ".join(metadata.get("heading_path", []))))
+ url_tokens = set(tokenize_for_search(metadata.get("url", "")))
+ doc_type = (metadata.get("doc_type", "") or "").strip().lower()
+ overlap = len(query_tokens & full_text_tokens) / len(query_tokens)
+ title_overlap = len(query_tokens & title_tokens) / len(query_tokens)
+ heading_overlap = len(query_tokens & heading_tokens) / len(query_tokens)
+ entity_overlap = 0.0
+ if salient_query_tokens:
+ entity_space = title_tokens | heading_tokens | url_tokens
+ entity_overlap = len(salient_query_tokens & entity_space) / len(salient_query_tokens)
+ exact_entity_bonus = 0.0
+ if salient_query_tokens and (salient_query_tokens & (title_tokens | url_tokens)):
+ exact_entity_bonus = 0.18
+ dense_bonus = 0.0
+ if candidate.get("distance") is not None:
+ dense_bonus = max(0.0, (DISTANCE_THRESHOLD - candidate["distance"]) / DISTANCE_THRESHOLD)
+ sparse_bonus = min(1.0, candidate.get("bm25_score", 0.0) / 10.0)
+ doc_type_bonus = 0.0
+ if prefers_tuning_guide:
+ if doc_type == "tuning guide":
+ doc_type_bonus += 0.30
+ elif "brief" in doc_type:
+ doc_type_bonus -= 0.12
+ if prefers_reference_architecture:
+ if doc_type == "reference architecture":
+ doc_type_bonus += 0.25
+ elif "brief" in doc_type:
+ doc_type_bonus -= 0.05
+ if prefers_tutorial:
+ if doc_type in {"tutorial", "install guide", "learning path"}:
+ doc_type_bonus += 0.10
+ rerank_score = (
+ candidate.get("rrf_score", 0.0)
+ + (0.35 * overlap)
+ + (0.20 * title_overlap)
+ + (0.15 * heading_overlap)
+ + (0.20 * entity_overlap)
+ + (0.15 * dense_bonus)
+ + (0.15 * sparse_bonus)
+ + exact_entity_bonus
+ + doc_type_bonus
+ )
+ reranked.append({**candidate, "rerank_score": rerank_score})
+ return sorted(reranked, key=lambda item: item["rerank_score"], reverse=True)
+
+
+def hybrid_search(
+ query: str,
+ usearch_index: Optional[Index],
+ metadata: List[Dict],
+ embedding_model: SentenceTransformer,
+ bm25_index: Optional[BM25Okapi],
+ k: int = K_RESULTS,
+) -> List[Dict[str, Any]]:
+ candidate_depth = max(k * 20, 100)
+ dense_results = embedding_search(query, usearch_index, metadata, embedding_model, candidate_depth)
+ sparse_results = bm25_search(query, metadata, bm25_index, candidate_depth)
+
+ candidates: Dict[str, Dict[str, Any]] = {}
+ for result in dense_results:
+ chunk_uuid = result["metadata"].get("chunk_uuid") or result["metadata"].get("uuid")
+ candidates[chunk_uuid] = {**result, "rrf_score": 1 / (RRF_K + result["rank"])}
+
+ for result in sparse_results:
+ chunk_uuid = result["metadata"].get("chunk_uuid") or result["metadata"].get("uuid")
+ existing = candidates.get(chunk_uuid, {"metadata": result["metadata"], "rrf_score": 0.0})
+ existing["rank"] = min(existing.get("rank", result["rank"]), result["rank"])
+ existing["bm25_score"] = result["bm25_score"]
+ existing["rrf_score"] += 1 / (RRF_K + result["rank"])
+ candidates[chunk_uuid] = existing
+
+ combined = rerank_candidates(query, list(candidates.values()))
+ return combined[:candidate_depth]
+
+
+def deduplicate_urls(results: List[Dict[str, Any]], max_chunks_per_url: int = 1) -> List[Dict[str, Any]]:
+ """Keep the highest-ranked chunk for each URL by default."""
+ seen_counts: Dict[str, int] = {}
deduplicated_results = []
- for item in embedding_results:
+ for item in results:
url = item["metadata"].get("url")
- if url and url not in seen_urls:
- seen_urls.add(url)
+ if not url:
+ continue
+ seen_counts[url] = seen_counts.get(url, 0) + 1
+ if seen_counts[url] <= max_chunks_per_url:
deduplicated_results.append(item)
- return deduplicated_results
\ No newline at end of file
+ return deduplicated_results
From 8fbcd9d45694ebf76f424314126dc89e57493c88 Mon Sep 17 00:00:00 2001
From: Joe Stech <4088382+JoeStech@users.noreply.github.com>
Date: Fri, 20 Mar 2026 16:24:15 -0600
Subject: [PATCH 2/3] Apply suggestion from @Copilot
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
---
mcp-local/utils/search_utils.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/mcp-local/utils/search_utils.py b/mcp-local/utils/search_utils.py
index 71acc25..90efa8f 100644
--- a/mcp-local/utils/search_utils.py
+++ b/mcp-local/utils/search_utils.py
@@ -211,7 +211,7 @@ def rerank_candidates(query: str, candidates: List[Dict[str, Any]]) -> List[Dict
elif "brief" in doc_type:
doc_type_bonus -= 0.05
if prefers_tutorial:
- if doc_type in {"tutorial", "install guide", "learning path"}:
+ if doc_type in {"tutorial", "install guide", "learning path", "learning paths"}:
doc_type_bonus += 0.10
rerank_score = (
candidate.get("rrf_score", 0.0)
From be402dba7cb2ca60a31d1bdedb42d0fe3a30c9b5 Mon Sep 17 00:00:00 2001
From: Joe Stech <4088382+JoeStech@users.noreply.github.com>
Date: Fri, 20 Mar 2026 16:24:58 -0600
Subject: [PATCH 3/3] Apply suggestion from @Copilot
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
---
mcp-local/utils/search_utils.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/mcp-local/utils/search_utils.py b/mcp-local/utils/search_utils.py
index 90efa8f..ba04c1b 100644
--- a/mcp-local/utils/search_utils.py
+++ b/mcp-local/utils/search_utils.py
@@ -254,7 +254,7 @@ def hybrid_search(
candidates[chunk_uuid] = existing
combined = rerank_candidates(query, list(candidates.values()))
- return combined[:candidate_depth]
+ return combined[:k]
def deduplicate_urls(results: List[Dict[str, Any]], max_chunks_per_url: int = 1) -> List[Dict[str, Any]]: