\n) -> List[Tuple[str, str]]:\n \"\"\"Chunk markdown by headers, preserving hierarchy.\"\"\"\n lines = text.split('\\n')\n chunks = []\n current_header = \"\"\n current_content = []\n\n for line in lines:\n if re.match(headers_pattern, line, re.MULTILINE):\n if current_content:\n chunks.append((current_header, '\\n'.join(current_content)))\n current_header = line\n current_content = []\n else:\n current_content.append(line)\n\n if current_content:\n chunks.append((current_header, '\\n'.join(current_content)))\n\n return chunks\n\ndef recursive_character_splitter(\n text: str,\n chunk_size: int = 1000,\n chunk_overlap: int = 200,\n separators: List[str] = None\n) -> List[str]:\n \"\"\"LangChain-style recursive splitter.\"\"\"\n separators = separators or [\"\\n\\n\", \"\\n\", \". \", \" \", \"\"]\n\n def split_text(text: str, separators: List[str]) -> List[str]:\n if not text:\n return []\n\n separator = separators[0]\n remaining_separators = separators[1:]\n\n if separator == \"\":\n # Character-level split\n return [text[i:i+chunk_size] for i in range(0, len(text), chunk_size - chunk_overlap)]\n\n splits = text.split(separator)\n chunks = []\n current_chunk = []\n current_length = 0\n\n for split in splits:\n split_length = len(split) + len(separator)\n\n if current_length + split_length > chunk_size and current_chunk:\n chunk_text = separator.join(current_chunk)\n\n # Recursively split if still too large\n if len(chunk_text) > chunk_size and remaining_separators:\n chunks.extend(split_text(chunk_text, remaining_separators))\n else:\n chunks.append(chunk_text)\n\n # Start new chunk with overlap\n overlap_splits = []\n overlap_length = 0\n for s in reversed(current_chunk):\n if overlap_length + len(s) \u003c= chunk_overlap:\n overlap_splits.insert(0, s)\n overlap_length += len(s)\n else:\n break\n current_chunk = overlap_splits\n current_length = overlap_length\n\n current_chunk.append(split)\n current_length += split_length\n\n if current_chunk:\n chunks.append(separator.join(current_chunk))\n\n return chunks\n\n return split_text(text, separators)\n```\n\n### Template 5: Domain-Specific Embedding Pipeline\n\n```\nimport re\nfrom typing import List, Optional\nfrom dataclasses import dataclass\n\n@dataclass\nclass EmbeddedDocument:\n id: str\n document_id: str\n chunk_index: int\n text: str\n embedding: List[float]\n metadata: dict\n\nclass DomainEmbeddingPipeline:\n \"\"\"Pipeline for domain-specific embeddings.\"\"\"\n\n def __init__(\n self,\n embedding_model: str = \"voyage-3-large\",\n chunk_size: int = 512,\n chunk_overlap: int = 50,\n preprocessing_fn=None\n ):\n self.embeddings = VoyageAIEmbeddings(model=embedding_model)\n self.chunk_size = chunk_size\n self.chunk_overlap = chunk_overlap\n self.preprocess = preprocessing_fn or self._default_preprocess\n\n def _default_preprocess(self, text: str) -> str:\n \"\"\"Default preprocessing.\"\"\"\n # Remove excessive whitespace\n text = re.sub(r'\\s+', ' ', text)\n # Remove special characters (customize for your domain)\n text = re.sub(r'[^\\w\\s.,!?-]', '', text)\n return text.strip()\n\n async def process_documents(\n self,\n documents: List[dict],\n id_field: str = \"id\",\n content_field: str = \"content\",\n metadata_fields: Optional[List[str]] = None\n ) -> List[EmbeddedDocument]:\n \"\"\"Process documents for vector storage.\"\"\"\n processed = []\n\n for doc in documents:\n content = doc[content_field]\n doc_id = doc[id_field]\n\n # Preprocess\n cleaned = self.preprocess(content)\n\n # Chunk\n chunks = chunk_by_tokens(\n cleaned,\n self.chunk_size,\n self.chunk_overlap\n )\n\n # Create embeddings\n embeddings = await self.embeddings.aembed_documents(chunks)\n\n # Create records\n for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):\n metadata = {\"document_id\": doc_id, \"chunk_index\": i}\n\n # Add specified metadata fields\n if metadata_fields:\n for field in metadata_fields:\n if field in doc:\n metadata[field] = doc[field]\n\n processed.append(EmbeddedDocument(\n id=f\"{doc_id}_chunk_{i}\",\n document_id=doc_id,\n chunk_index=i,\n text=chunk,\n embedding=embedding,\n metadata=metadata\n ))\n\n return processed\n\n# Code-specific pipeline\nclass CodeEmbeddingPipeline:\n \"\"\"Specialized pipeline for code embeddings.\"\"\"\n\n def __init__(self):\n # Use Voyage's code-specific model\n self.embeddings = VoyageAIEmbeddings(model=\"voyage-code-3\")\n\n def chunk_code(self, code: str, language: str) -> List[dict]:\n \"\"\"Chunk code by functions/classes using tree-sitter.\"\"\"\n try:\n import tree_sitter_languages\n parser = tree_sitter_languages.get_parser(language)\n tree = parser.parse(bytes(code, \"utf8\"))\n\n chunks = []\n # Extract function and class definitions\n self._extract_nodes(tree.root_node, code, chunks)\n return chunks\n except ImportError:\n # Fallback to simple chunking\n return [{\"text\": code, \"type\": \"module\"}]\n\n def _extract_nodes(self, node, source_code: str, chunks: list):\n \"\"\"Recursively extract function/class definitions.\"\"\"\n if node.type in ['function_definition', 'class_definition', 'method_definition']:\n text = source_code[node.start_byte:node.end_byte]\n chunks.append({\n \"text\": text,\n \"type\": node.type,\n \"name\": self._get_name(node),\n \"start_line\": node.start_point[0],\n \"end_line\": node.end_point[0]\n })\n for child in node.children:\n self._extract_nodes(child, source_code, chunks)\n\n def _get_name(self, node) -> str:\n \"\"\"Extract name from function/class node.\"\"\"\n for child in node.children:\n if child.type == 'identifier' or child.type == 'name':\n return child.text.decode('utf8')\n return \"unknown\"\n\n async def embed_with_context(\n self,\n chunk: str,\n context: str = \"\"\n ) -> List[float]:\n \"\"\"Embed code with surrounding context.\"\"\"\n if context:\n combined = f\"Context: {context}\\n\\nCode:\\n{chunk}\"\n else:\n combined = chunk\n return await self.embeddings.aembed_query(combined)\n```\n\n### Template 6: Embedding Quality Evaluation\n\n```\nimport numpy as np\nfrom typing import List, Dict\n\ndef evaluate_retrieval_quality(\n queries: List[str],\n relevant_docs: List[List[str]], # List of relevant doc IDs per query\n retrieved_docs: List[List[str]], # List of retrieved doc IDs per query\n k: int = 10\n) -> Dict[str, float]:\n \"\"\"Evaluate embedding quality for retrieval.\"\"\"\n\n def precision_at_k(relevant: set, retrieved: List[str], k: int) -> float:\n retrieved_k = retrieved[:k]\n relevant_retrieved = len(set(retrieved_k) & relevant)\n return relevant_retrieved / k if k > 0 else 0\n\n def recall_at_k(relevant: set, retrieved: List[str], k: int) -> float:\n retrieved_k = retrieved[:k]\n relevant_retrieved = len(set(retrieved_k) & relevant)\n return relevant_retrieved / len(relevant) if relevant else 0\n\n def mrr(relevant: set, retrieved: List[str]) -> float:\n for i, doc in enumerate(retrieved):\n if doc in relevant:\n return 1 / (i + 1)\n return 0\n\n def ndcg_at_k(relevant: set, retrieved: List[str], k: int) -> float:\n dcg = sum(\n 1 / np.log2(i + 2) if doc in relevant else 0\n for i, doc in enumerate(retrieved[:k])\n )\n ideal_dcg = sum(1 / np.log2(i + 2) for i in range(min(len(relevant), k)))\n return dcg / ideal_dcg if ideal_dcg > 0 else 0\n\n metrics = {\n f\"precision@{k}\": [],\n f\"recall@{k}\": [],\n \"mrr\": [],\n f\"ndcg@{k}\": []\n }\n\n for relevant, retrieved in zip(relevant_docs, retrieved_docs):\n relevant_set = set(relevant)\n metrics[f\"precision@{k}\"].append(precision_at_k(relevant_set, retrieved, k))\n metrics[f\"recall@{k}\"].append(recall_at_k(relevant_set, retrieved, k))\n metrics[\"mrr\"].append(mrr(relevant_set, retrieved))\n metrics[f\"ndcg@{k}\"].append(ndcg_at_k(relevant_set, retrieved, k))\n\n return {name: np.mean(values) for name, values in metrics.items()}\n\ndef compute_embedding_similarity(\n embeddings1: np.ndarray,\n embeddings2: np.ndarray,\n metric: str = \"cosine\"\n) -> np.ndarray:\n \"\"\"Compute similarity matrix between embedding sets.\"\"\"\n if metric == \"cosine\":\n # Normalize and compute dot product\n norm1 = embeddings1 / np.linalg.norm(embeddings1, axis=1, keepdims=True)\n norm2 = embeddings2 / np.linalg.norm(embeddings2, axis=1, keepdims=True)\n return norm1 @ norm2.T\n elif metric == \"euclidean\":\n from scipy.spatial.distance import cdist\n return -cdist(embeddings1, embeddings2, metric='euclidean')\n elif metric == \"dot\":\n return embeddings1 @ embeddings2.T\n else:\n raise ValueError(f\"Unknown metric: {metric}\")\n\ndef compare_embedding_models(\n texts: List[str],\n models: Dict[str, callable],\n queries: List[str],\n relevant_indices: List[List[int]],\n k: int = 5\n) -> Dict[str, Dict[str, float]]:\n \"\"\"Compare multiple embedding models on retrieval quality.\"\"\"\n results = {}\n\n for model_name, embed_fn in models.items():\n # Embed all texts\n doc_embeddings = np.array(embed_fn(texts))\n\n retrieved_per_query = []\n for query in queries:\n query_embedding = np.array(embed_fn([query])[0])\n # Compute similarities\n similarities = compute_embedding_similarity(\n query_embedding.reshape(1, -1),\n doc_embeddings,\n metric=\"cosine\"\n )[0]\n # Get top-k indices\n top_k_indices = np.argsort(similarities)[::-1][:k]\n retrieved_per_query.append([str(i) for i in top_k_indices])\n\n # Convert relevant indices to string IDs\n relevant_docs = [[str(i) for i in indices] for indices in relevant_indices]\n\n results[model_name] = evaluate_retrieval_quality(\n queries, relevant_docs, retrieved_per_query, k\n )\n\n return results\n```\n\n## Best Practices\n\n### Do's\n\n- **Match model to use case**: Code vs prose vs multilingual\n- **Chunk thoughtfully**: Preserve semantic boundaries\n- **Normalize embeddings**: For cosine similarity search\n- **Batch requests**: More efficient than one-by-one\n- **Cache embeddings**: Avoid recomputing for static content\n- **Use Voyage AI for Claude apps**: Recommended by Anthropic\n\n### Don'ts\n\n- **Don't ignore token limits**: Truncation loses information\n- **Don't mix embedding models**: Incompatible vector spaces\n- **Don't skip preprocessing**: Garbage in, garbage out\n- **Don't over-chunk**: Lose important context\n- **Don't forget metadata**: Essential for filtering and debugging\n\n## Resources\n\n- [Voyage AI Documentation](https://docs.voyageai.com/)\n- [OpenAI Embeddings Guide](https://platform.openai.com/docs/guides/embeddings)\n- [Sentence Transformers](https://www.sbert.net/)\n- [MTEB Benchmark](https://huggingface.co/spaces/mteb/leaderboard)\n- [LangChain Embedding Models](https://python.langchain.com/docs/integrations/text_embedding/)","repositoryUrl":"https://github.com/wshobson/agents","originalUrl":"https://skills.sh/wshobson/agents/embedding-strategies","originalInstalls":997,"platformInstalls":0,"isVerified":true,"importedBy":null,"createdAt":"2026-02-10T03:38:58.010Z","updatedAt":null,"deletedAt":null,"totalInstalls":997}}

embedding-strategies

$npx skills add wshobson/agents --skill embedding-strategies
SKILL.md

Embedding Strategies

Guide to selecting and optimizing embedding models for vector search applications. - Choosing embedding models for RAG - Optimizing chunking strategies

Embedding Strategies

Guide to selecting and optimizing embedding models for vector search applications.

When to Use This Skill

  • Choosing embedding models for RAG
  • Optimizing chunking strategies
  • Fine-tuning embeddings for domains
  • Comparing embedding model performance
  • Reducing embedding dimensions
  • Handling multilingual content

Core Concepts

1. Embedding Model Comparison (2026)

Model
Dimensions
Max Tokens
Best For
voyage-3-large
1024
32000
Claude apps (Anthropic recommended)
voyage-3
1024
32000
Claude apps, cost-effective
voyage-code-3
1024
32000
Code search
voyage-finance-2
1024
32000
Financial documents
voyage-law-2
1024
32000
Legal documents
text-embedding-3-large
3072
8191
OpenAI apps, high accuracy
text-embedding-3-small
1536
8191
OpenAI apps, cost-effective
bge-large-en-v1.5
1024
512
Open source, local deployment
all-MiniLM-L6-v2
384
256
Fast, lightweight
multilingual-e5-large
1024
512
Multi-language

2. Embedding Pipeline

Document → Chunking → Preprocessing → Embedding Model → Vector
                ↓
        [Overlap, Size]  [Clean, Normalize]  [API/Local]

Templates

Template 1: Voyage AI Embeddings (Recommended for Claude)

from langchain_voyageai import VoyageAIEmbeddings
from typing import List
import os

# Initialize Voyage AI embeddings (recommended by Anthropic for Claude)
embeddings = VoyageAIEmbeddings(
    model="voyage-3-large",
    voyage_api_key=os.environ.get("VOYAGE_API_KEY")
)

def get_embeddings(texts: List[str]) -> List[List[float]]:
    """Get embeddings from Voyage AI."""
    return embeddings.embed_documents(texts)

def get_query_embedding(query: str) -> List[float]:
    """Get single query embedding."""
    return embeddings.embed_query(query)

# Specialized models for domains
code_embeddings = VoyageAIEmbeddings(model="voyage-code-3")
finance_embeddings = VoyageAIEmbeddings(model="voyage-finance-2")
legal_embeddings = VoyageAIEmbeddings(model="voyage-law-2")
`### Template 2: OpenAI Embeddings`
from openai import OpenAI
from typing import List
import numpy as np

client = OpenAI()

def get_embeddings(
    texts: List[str],
    model: str = "text-embedding-3-small",
    dimensions: int = None
) -> List[List[float]]:
    """Get embeddings from OpenAI with optional dimension reduction."""
    # Handle batching for large lists
    batch_size = 100
    all_embeddings = []

    for i in range(0, len(texts), batch_size):
        batch = texts[i:i + batch_size]

        kwargs = {"input": batch, "model": model}
        if dimensions:
            # Matryoshka dimensionality reduction
            kwargs["dimensions"] = dimensions

        response = client.embeddings.create(**kwargs)
        embeddings = [item.embedding for item in response.data]
        all_embeddings.extend(embeddings)

    return all_embeddings

def get_embedding(text: str, **kwargs) -> List[float]:
    """Get single embedding."""
    return get_embeddings([text], **kwargs)[0]

# Dimension reduction with Matryoshka embeddings
def get_reduced_embedding(text: str, dimensions: int = 512) -> List[float]:
    """Get embedding with reduced dimensions (Matryoshka)."""
    return get_embedding(
        text,
        model="text-embedding-3-small",
        dimensions=dimensions
    )
`### Template 3: Local Embeddings with Sentence Transformers`
from sentence_transformers import SentenceTransformer
from typing import List, Optional
import numpy as np

class LocalEmbedder:
    """Local embedding with sentence-transformers."""

    def __init__(
        self,
        model_name: str = "BAAI/bge-large-en-v1.5",
        device: str = "cuda"
    ):
        self.model = SentenceTransformer(model_name, device=device)
        self.model_name = model_name

    def embed(
        self,
        texts: List[str],
        normalize: bool = True,
        show_progress: bool = False
    ) -> np.ndarray:
        """Embed texts with optional normalization."""
        embeddings = self.model.encode(
            texts,
            normalize_embeddings=normalize,
            show_progress_bar=show_progress,
            convert_to_numpy=True
        )
        return embeddings

    def embed_query(self, query: str) -> np.ndarray:
        """Embed a query with appropriate prefix for retrieval models."""
        # BGE and similar models benefit from query prefix
        if "bge" in self.model_name.lower():
            query = f"Represent this sentence for searching relevant passages: {query}"
        return self.embed([query])[0]

    def embed_documents(self, documents: List[str]) -> np.ndarray:
        """Embed documents for indexing."""
        return self.embed(documents)

# E5 model with instructions
class E5Embedder:
    def __init__(self, model_name: str = "intfloat/multilingual-e5-large"):
        self.model = SentenceTransformer(model_name)

    def embed_query(self, query: str) -> np.ndarray:
        """E5 requires 'query:' prefix for queries."""
        return self.model.encode(f"query: {query}")

    def embed_document(self, document: str) -> np.ndarray:
        """E5 requires 'passage:' prefix for documents."""
        return self.model.encode(f"passage: {document}")
`### Template 4: Chunking Strategies`
from typing import List, Tuple
import re

def chunk_by_tokens(
    text: str,
    chunk_size: int = 512,
    chunk_overlap: int = 50,
    tokenizer=None
) -> List[str]:
    """Chunk text by token count."""
    import tiktoken
    tokenizer = tokenizer or tiktoken.get_encoding("cl100k_base")

    tokens = tokenizer.encode(text)
    chunks = []

    start = 0
    while start < len(tokens):
        end = start + chunk_size
        chunk_tokens = tokens[start:end]
        chunk_text = tokenizer.decode(chunk_tokens)
        chunks.append(chunk_text)
        start = end - chunk_overlap

    return chunks

def chunk_by_sentences(
    text: str,
    max_chunk_size: int = 1000,
    min_chunk_size: int = 100
) -> List[str]:
    """Chunk text by sentences, respecting size limits."""
    import nltk
    sentences = nltk.sent_tokenize(text)

    chunks = []
    current_chunk = []
    current_size = 0

    for sentence in sentences:
        sentence_size = len(sentence)

        if current_size + sentence_size > max_chunk_size and current_chunk:
            chunks.append(" ".join(current_chunk))
            current_chunk = []
            current_size = 0

        current_chunk.append(sentence)
        current_size += sentence_size

    if current_chunk:
        chunks.append(" ".join(current_chunk))

    return chunks

def chunk_by_semantic_sections(
    text: str,
    headers_pattern: str = r'^#{1,3}\s+.+
#x27; ) -> List[Tuple[str, str]]: """Chunk markdown by headers, preserving hierarchy.""" lines = text.split('\n') chunks = [] current_header = "" current_content = [] for line in lines: if re.match(headers_pattern, line, re.MULTILINE): if current_content: chunks.append((current_header, '\n'.join(current_content))) current_header = line current_content = [] else: current_content.append(line) if current_content: chunks.append((current_header, '\n'.join(current_content))) return chunks def recursive_character_splitter( text: str, chunk_size: int = 1000, chunk_overlap: int = 200, separators: List[str] = None ) -> List[str]: """LangChain-style recursive splitter.""" separators = separators or ["\n\n", "\n", ". ", " ", ""] def split_text(text: str, separators: List[str]) -> List[str]: if not text: return [] separator = separators[0] remaining_separators = separators[1:] if separator == "": # Character-level split return [text[i:i+chunk_size] for i in range(0, len(text), chunk_size - chunk_overlap)] splits = text.split(separator) chunks = [] current_chunk = [] current_length = 0 for split in splits: split_length = len(split) + len(separator) if current_length + split_length > chunk_size and current_chunk: chunk_text = separator.join(current_chunk) # Recursively split if still too large if len(chunk_text) > chunk_size and remaining_separators: chunks.extend(split_text(chunk_text, remaining_separators)) else: chunks.append(chunk_text) # Start new chunk with overlap overlap_splits = [] overlap_length = 0 for s in reversed(current_chunk): if overlap_length + len(s) <= chunk_overlap: overlap_splits.insert(0, s) overlap_length += len(s) else: break current_chunk = overlap_splits current_length = overlap_length current_chunk.append(split) current_length += split_length if current_chunk: chunks.append(separator.join(current_chunk)) return chunks return split_text(text, separators) `### Template 5: Domain-Specific Embedding Pipeline` import re from typing import List, Optional from dataclasses import dataclass @dataclass class EmbeddedDocument: id: str document_id: str chunk_index: int text: str embedding: List[float] metadata: dict class DomainEmbeddingPipeline: """Pipeline for domain-specific embeddings.""" def __init__( self, embedding_model: str = "voyage-3-large", chunk_size: int = 512, chunk_overlap: int = 50, preprocessing_fn=None ): self.embeddings = VoyageAIEmbeddings(model=embedding_model) self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap self.preprocess = preprocessing_fn or self._default_preprocess def _default_preprocess(self, text: str) -> str: """Default preprocessing.""" # Remove excessive whitespace text = re.sub(r'\s+', ' ', text) # Remove special characters (customize for your domain) text = re.sub(r'[^\w\s.,!?-]', '', text) return text.strip() async def process_documents( self, documents: List[dict], id_field: str = "id", content_field: str = "content", metadata_fields: Optional[List[str]] = None ) -> List[EmbeddedDocument]: """Process documents for vector storage.""" processed = [] for doc in documents: content = doc[content_field] doc_id = doc[id_field] # Preprocess cleaned = self.preprocess(content) # Chunk chunks = chunk_by_tokens( cleaned, self.chunk_size, self.chunk_overlap ) # Create embeddings embeddings = await self.embeddings.aembed_documents(chunks) # Create records for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)): metadata = {"document_id": doc_id, "chunk_index": i} # Add specified metadata fields if metadata_fields: for field in metadata_fields: if field in doc: metadata[field] = doc[field] processed.append(EmbeddedDocument( id=f"{doc_id}_chunk_{i}", document_id=doc_id, chunk_index=i, text=chunk, embedding=embedding, metadata=metadata )) return processed # Code-specific pipeline class CodeEmbeddingPipeline: """Specialized pipeline for code embeddings.""" def __init__(self): # Use Voyage's code-specific model self.embeddings = VoyageAIEmbeddings(model="voyage-code-3") def chunk_code(self, code: str, language: str) -> List[dict]: """Chunk code by functions/classes using tree-sitter.""" try: import tree_sitter_languages parser = tree_sitter_languages.get_parser(language) tree = parser.parse(bytes(code, "utf8")) chunks = [] # Extract function and class definitions self._extract_nodes(tree.root_node, code, chunks) return chunks except ImportError: # Fallback to simple chunking return [{"text": code, "type": "module"}] def _extract_nodes(self, node, source_code: str, chunks: list): """Recursively extract function/class definitions.""" if node.type in ['function_definition', 'class_definition', 'method_definition']: text = source_code[node.start_byte:node.end_byte] chunks.append({ "text": text, "type": node.type, "name": self._get_name(node), "start_line": node.start_point[0], "end_line": node.end_point[0] }) for child in node.children: self._extract_nodes(child, source_code, chunks) def _get_name(self, node) -> str: """Extract name from function/class node.""" for child in node.children: if child.type == 'identifier' or child.type == 'name': return child.text.decode('utf8') return "unknown" async def embed_with_context( self, chunk: str, context: str = "" ) -> List[float]: """Embed code with surrounding context.""" if context: combined = f"Context: {context}\n\nCode:\n{chunk}" else: combined = chunk return await self.embeddings.aembed_query(combined) `### Template 6: Embedding Quality Evaluation` import numpy as np from typing import List, Dict def evaluate_retrieval_quality( queries: List[str], relevant_docs: List[List[str]], # List of relevant doc IDs per query retrieved_docs: List[List[str]], # List of retrieved doc IDs per query k: int = 10 ) -> Dict[str, float]: """Evaluate embedding quality for retrieval.""" def precision_at_k(relevant: set, retrieved: List[str], k: int) -> float: retrieved_k = retrieved[:k] relevant_retrieved = len(set(retrieved_k) & relevant) return relevant_retrieved / k if k > 0 else 0 def recall_at_k(relevant: set, retrieved: List[str], k: int) -> float: retrieved_k = retrieved[:k] relevant_retrieved = len(set(retrieved_k) & relevant) return relevant_retrieved / len(relevant) if relevant else 0 def mrr(relevant: set, retrieved: List[str]) -> float: for i, doc in enumerate(retrieved): if doc in relevant: return 1 / (i + 1) return 0 def ndcg_at_k(relevant: set, retrieved: List[str], k: int) -> float: dcg = sum( 1 / np.log2(i + 2) if doc in relevant else 0 for i, doc in enumerate(retrieved[:k]) ) ideal_dcg = sum(1 / np.log2(i + 2) for i in range(min(len(relevant), k))) return dcg / ideal_dcg if ideal_dcg > 0 else 0 metrics = { f"precision@{k}": [], f"recall@{k}": [], "mrr": [], f"ndcg@{k}": [] } for relevant, retrieved in zip(relevant_docs, retrieved_docs): relevant_set = set(relevant) metrics[f"precision@{k}"].append(precision_at_k(relevant_set, retrieved, k)) metrics[f"recall@{k}"].append(recall_at_k(relevant_set, retrieved, k)) metrics["mrr"].append(mrr(relevant_set, retrieved)) metrics[f"ndcg@{k}"].append(ndcg_at_k(relevant_set, retrieved, k)) return {name: np.mean(values) for name, values in metrics.items()} def compute_embedding_similarity( embeddings1: np.ndarray, embeddings2: np.ndarray, metric: str = "cosine" ) -> np.ndarray: """Compute similarity matrix between embedding sets.""" if metric == "cosine": # Normalize and compute dot product norm1 = embeddings1 / np.linalg.norm(embeddings1, axis=1, keepdims=True) norm2 = embeddings2 / np.linalg.norm(embeddings2, axis=1, keepdims=True) return norm1 @ norm2.T elif metric == "euclidean": from scipy.spatial.distance import cdist return -cdist(embeddings1, embeddings2, metric='euclidean') elif metric == "dot": return embeddings1 @ embeddings2.T else: raise ValueError(f"Unknown metric: {metric}") def compare_embedding_models( texts: List[str], models: Dict[str, callable], queries: List[str], relevant_indices: List[List[int]], k: int = 5 ) -> Dict[str, Dict[str, float]]: """Compare multiple embedding models on retrieval quality.""" results = {} for model_name, embed_fn in models.items(): # Embed all texts doc_embeddings = np.array(embed_fn(texts)) retrieved_per_query = [] for query in queries: query_embedding = np.array(embed_fn([query])[0]) # Compute similarities similarities = compute_embedding_similarity( query_embedding.reshape(1, -1), doc_embeddings, metric="cosine" )[0] # Get top-k indices top_k_indices = np.argsort(similarities)[::-1][:k] retrieved_per_query.append([str(i) for i in top_k_indices]) # Convert relevant indices to string IDs relevant_docs = [[str(i) for i in indices] for indices in relevant_indices] results[model_name] = evaluate_retrieval_quality( queries, relevant_docs, retrieved_per_query, k ) return results

Best Practices

Do's

  • Match model to use case: Code vs prose vs multilingual
  • Chunk thoughtfully: Preserve semantic boundaries
  • Normalize embeddings: For cosine similarity search
  • Batch requests: More efficient than one-by-one
  • Cache embeddings: Avoid recomputing for static content
  • Use Voyage AI for Claude apps: Recommended by Anthropic

Don'ts

  • Don't ignore token limits: Truncation loses information
  • Don't mix embedding models: Incompatible vector spaces
  • Don't skip preprocessing: Garbage in, garbage out
  • Don't over-chunk: Lose important context
  • Don't forget metadata: Essential for filtering and debugging

Resources