Retrieval-Augmented Generation (RAG): Enhancing LLMs with External Knowledge
Retrieval-Augmented Generation (RAG) is a powerful technique that combines the generative capabilities of language models with external knowledge retrieval, enabling more accurate, up-to-date, and factual responses.
Understanding RAG Architecture
Core Components
- Retriever: Finds relevant documents/passages
- Generator: Produces responses based on retrieved context
- Knowledge Base: External source of information
- Encoder: Converts text to vector representations
RAG Workflow
graph LR
A[User Query] --> B[Query Encoding]
B --> C[Document Retrieval]
C --> D[Context Selection]
D --> E[Prompt Construction]
E --> F[LLM Generation]
F --> G[Response]
H[Document Corpus] --> I[Document Processing]
I --> J[Chunking]
J --> K[Embedding]
K --> L[Vector Store]
L --> C
Building a Basic RAG System
1. Document Processing and Indexing
import os
from typing import List, Dict, Any
from langchain.document_loaders import PyPDFLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
import chromadb
class DocumentProcessor:
def __init__(self, chunk_size=1000, chunk_overlap=200):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
)
def load_documents(self, file_paths: List[str]) -> List[Dict]:
"""Load documents from various file formats"""
documents = []
for file_path in file_paths:
try:
if file_path.endswith('.pdf'):
loader = PyPDFLoader(file_path)
elif file_path.endswith('.txt'):
loader = TextLoader(file_path)
else:
print(f"Unsupported file format: {file_path}")
continue
docs = loader.load()
for doc in docs:
doc.metadata['source'] = file_path
documents.extend(docs)
except Exception as e:
print(f"Error loading {file_path}: {e}")
return documents
def chunk_documents(self, documents: List[Dict]) -> List[Dict]:
"""Split documents into chunks"""
chunks = []
for doc in documents:
doc_chunks = self.text_splitter.split_documents([doc])
# Add chunk metadata
for i, chunk in enumerate(doc_chunks):
chunk.metadata.update({
'chunk_id': f"{doc.metadata.get('source', 'unknown')}_{i}",
'chunk_index': i,
'total_chunks': len(doc_chunks)
})
chunks.extend(doc_chunks)
return chunks
def process_documents(self, file_paths: List[str]) -> List[Dict]:
"""Complete document processing pipeline"""
print("Loading documents...")
documents = self.load_documents(file_paths)
print(f"Loaded {len(documents)} documents")
print("Chunking documents...")
chunks = self.chunk_documents(documents)
print(f"Created {len(chunks)} chunks")
return chunks
# Usage
processor = DocumentProcessor(chunk_size=1000, chunk_overlap=200)
chunks = processor.process_documents([
"document1.pdf",
"document2.txt",
"document3.pdf"
])
2. Vector Store Creation
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
import pickle
class VectorStoreManager:
def __init__(self, embedding_model="sentence-transformers/all-MiniLM-L6-v2"):
self.embeddings = HuggingFaceEmbeddings(
model_name=embedding_model,
model_kwargs={'device': 'cpu'},
encode_kwargs={'normalize_embeddings': True}
)
self.vector_store = None
def create_vector_store(self, chunks: List[Dict], persist_directory=None):
"""Create vector store from document chunks"""
print("Creating embeddings...")
# Extract text and metadata
texts = [chunk.page_content for chunk in chunks]
metadatas = [chunk.metadata for chunk in chunks]
# Create vector store
if persist_directory:
self.vector_store = Chroma.from_texts(
texts=texts,
metadatas=metadatas,
embedding=self.embeddings,
persist_directory=persist_directory
)
self.vector_store.persist()
else:
self.vector_store = FAISS.from_texts(
texts=texts,
metadatas=metadatas,
embedding=self.embeddings
)
print(f"Vector store created with {len(texts)} documents")
return self.vector_store
def load_vector_store(self, persist_directory):
"""Load existing vector store"""
self.vector_store = Chroma(
persist_directory=persist_directory,
embedding_function=self.embeddings
)
return self.vector_store
def add_documents(self, chunks: List[Dict]):
"""Add new documents to existing vector store"""
if not self.vector_store:
raise ValueError("Vector store not initialized")
texts = [chunk.page_content for chunk in chunks]
metadatas = [chunk.metadata for chunk in chunks]
self.vector_store.add_texts(texts=texts, metadatas=metadatas)
if hasattr(self.vector_store, 'persist'):
self.vector_store.persist()
def search(self, query: str, k: int = 5, score_threshold: float = None):
"""Search for relevant documents"""
if not self.vector_store:
raise ValueError("Vector store not initialized")
if score_threshold:
results = self.vector_store.similarity_search_with_score(
query, k=k
)
# Filter by score threshold
filtered_results = [
(doc, score) for doc, score in results
if score >= score_threshold
]
return [doc for doc, score in filtered_results]
else:
return self.vector_store.similarity_search(query, k=k)
# Usage
vector_manager = VectorStoreManager()
vector_store = vector_manager.create_vector_store(
chunks,
persist_directory="./vector_db"
)
3. RAG Pipeline Implementation
from langchain.llms import OpenAI
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
import openai
class RAGPipeline:
def __init__(self, vector_store, llm_model="gpt-3.5-turbo", temperature=0.1):
self.vector_store = vector_store
self.llm = OpenAI(model_name=llm_model, temperature=temperature)
self.retriever = vector_store.as_retriever()
# Custom prompt template
self.prompt_template = PromptTemplate(
template="""Use the following pieces of context to answer the question at the end.
If you don't know the answer based on the context, just say that you don't know,
don't try to make up an answer.
Context:
{context}
Question: {question}
Answer: """,
input_variables=["context", "question"]
)
# Create QA chain
self.qa_chain = RetrievalQA.from_chain_type(
llm=self.llm,
chain_type="stuff",
retriever=self.retriever,
chain_type_kwargs={"prompt": self.prompt_template},
return_source_documents=True
)
def query(self, question: str, k: int = 5) -> Dict[str, Any]:
"""Process a query through the RAG pipeline"""
# Update retriever parameters
self.retriever.search_kwargs = {"k": k}
# Get response
result = self.qa_chain({"query": question})
# Format response
response = {
"question": question,
"answer": result["result"],
"source_documents": [
{
"content": doc.page_content,
"metadata": doc.metadata,
"source": doc.metadata.get("source", "unknown")
}
for doc in result["source_documents"]
],
"num_sources": len(result["source_documents"])
}
return response
def batch_query(self, questions: List[str]) -> List[Dict[str, Any]]:
"""Process multiple queries"""
results = []
for question in questions:
result = self.query(question)
results.append(result)
return results
def update_prompt(self, new_template: str):
"""Update the prompt template"""
self.prompt_template = PromptTemplate(
template=new_template,
input_variables=["context", "question"]
)
# Recreate QA chain with new prompt
self.qa_chain = RetrievalQA.from_chain_type(
llm=self.llm,
chain_type="stuff",
retriever=self.retriever,
chain_type_kwargs={"prompt": self.prompt_template},
return_source_documents=True
)
# Usage
rag_pipeline = RAGPipeline(vector_store)
response = rag_pipeline.query("What are the main benefits of machine learning?")
print(f"Answer: {response['answer']}")
print(f"Sources: {response['num_sources']}")
Advanced RAG Techniques
1. Hybrid Search (Dense + Sparse)
from rank_bm25 import BM25Okapi
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
class HybridRetriever:
def __init__(self, documents, embeddings_model, alpha=0.5):
self.documents = documents
self.embeddings_model = embeddings_model
self.alpha = alpha # Weight for dense vs sparse
# Prepare texts
self.texts = [doc.page_content for doc in documents]
# Initialize BM25 (sparse retrieval)
tokenized_texts = [text.split() for text in self.texts]
self.bm25 = BM25Okapi(tokenized_texts)
# Create dense embeddings
self.dense_embeddings = self.embeddings_model.embed_documents(self.texts)
self.dense_embeddings = np.array(self.dense_embeddings)
def search(self, query: str, k: int = 5) -> List[Dict]:
"""Hybrid search combining dense and sparse retrieval"""
# Dense retrieval
query_embedding = self.embeddings_model.embed_query(query)
query_embedding = np.array(query_embedding)
# Calculate cosine similarity
dense_scores = np.dot(self.dense_embeddings, query_embedding) / (
np.linalg.norm(self.dense_embeddings, axis=1) * np.linalg.norm(query_embedding)
)
# Sparse retrieval (BM25)
query_tokens = query.split()
sparse_scores = self.bm25.get_scores(query_tokens)
# Normalize scores
dense_scores = (dense_scores - dense_scores.min()) / (dense_scores.max() - dense_scores.min())
sparse_scores = (sparse_scores - sparse_scores.min()) / (sparse_scores.max() - sparse_scores.min())
# Combine scores
hybrid_scores = self.alpha * dense_scores + (1 - self.alpha) * sparse_scores
# Get top-k results
top_indices = np.argsort(hybrid_scores)[::-1][:k]
results = []
for idx in top_indices:
results.append({
'document': self.documents[idx],
'score': hybrid_scores[idx],
'dense_score': dense_scores[idx],
'sparse_score': sparse_scores[idx]
})
return results
# Usage
hybrid_retriever = HybridRetriever(chunks, vector_manager.embeddings)
results = hybrid_retriever.search("machine learning applications", k=5)
2. Multi-Step Reasoning RAG
class MultiStepRAG:
def __init__(self, vector_store, llm):
self.vector_store = vector_store
self.llm = llm
# Decomposition prompt
self.decompose_prompt = """
Break down this complex question into simpler sub-questions that can be answered independently:
Question: {question}
Sub-questions:
1.
2.
3.
"""
# Synthesis prompt
self.synthesis_prompt = """
Based on the following sub-questions and their answers, provide a comprehensive answer to the original question:
Original Question: {original_question}
Sub-questions and Answers:
{sub_qa_pairs}
Comprehensive Answer:
"""
def decompose_question(self, question: str) -> List[str]:
"""Break complex question into sub-questions"""
prompt = self.decompose_prompt.format(question=question)
response = self.llm(prompt)
# Parse sub-questions (simple parsing)
lines = response.strip().split('\n')
sub_questions = []
for line in lines:
line = line.strip()
if line and (line.startswith(('1.', '2.', '3.', '-', '•'))):
# Clean up the sub-question
sub_question = line.split('.', 1)[-1].strip()
if sub_question:
sub_questions.append(sub_question)
return sub_questions
def answer_sub_question(self, sub_question: str) -> Dict[str, Any]:
"""Answer a single sub-question using RAG"""
# Retrieve relevant documents
relevant_docs = self.vector_store.similarity_search(sub_question, k=3)
# Create context
context = "\n\n".join([doc.page_content for doc in relevant_docs])
# Generate answer
prompt = f"""
Context: {context}
Question: {sub_question}
Answer based on the context:
"""
answer = self.llm(prompt)
return {
'question': sub_question,
'answer': answer.strip(),
'sources': relevant_docs
}
def synthesize_answer(self, original_question: str, sub_qa_pairs: List[Dict]) -> str:
"""Synthesize final answer from sub-answers"""
# Format sub-Q&A pairs
formatted_pairs = []
for i, pair in enumerate(sub_qa_pairs, 1):
formatted_pairs.append(f"Q{i}: {pair['question']}\nA{i}: {pair['answer']}")
sub_qa_text = "\n\n".join(formatted_pairs)
# Generate synthesis
prompt = self.synthesis_prompt.format(
original_question=original_question,
sub_qa_pairs=sub_qa_text
)
final_answer = self.llm(prompt)
return final_answer.strip()
def query(self, question: str) -> Dict[str, Any]:
"""Process complex query with multi-step reasoning"""
# Step 1: Decompose question
sub_questions = self.decompose_question(question)
if not sub_questions:
# Fallback to simple RAG
relevant_docs = self.vector_store.similarity_search(question, k=5)
context = "\n\n".join([doc.page_content for doc in relevant_docs])
prompt = f"""
Context: {context}
Question: {question}
Answer:
"""
answer = self.llm(prompt)
return {
'question': question,
'answer': answer.strip(),
'method': 'simple_rag',
'sources': relevant_docs
}
# Step 2: Answer sub-questions
sub_answers = []
all_sources = []
for sub_q in sub_questions:
sub_result = self.answer_sub_question(sub_q)
sub_answers.append(sub_result)
all_sources.extend(sub_result['sources'])
# Step 3: Synthesize final answer
final_answer = self.synthesize_answer(question, sub_answers)
return {
'question': question,
'answer': final_answer,
'method': 'multi_step',
'sub_questions': sub_questions,
'sub_answers': sub_answers,
'sources': all_sources
}
# Usage
multi_step_rag = MultiStepRAG(vector_store, llm)
result = multi_step_rag.query(
"How do machine learning algorithms impact healthcare outcomes and what are the ethical considerations?"
)
3. Self-Reflective RAG
class SelfReflectiveRAG:
def __init__(self, vector_store, llm):
self.vector_store = vector_store
self.llm = llm
self.reflection_prompt = """
Evaluate the following answer to determine if it adequately addresses the question:
Question: {question}
Answer: {answer}
Retrieved Context: {context}
Evaluation Criteria:
1. Completeness: Does the answer fully address all aspects of the question?
2. Accuracy: Is the answer consistent with the provided context?
3. Relevance: Is the answer directly relevant to the question?
4. Clarity: Is the answer clear and well-structured?
Provide a score from 1-10 and explain what could be improved:
Score:
Improvements needed:
"""
self.refinement_prompt = """
Based on the evaluation feedback, improve the following answer:
Original Question: {question}
Original Answer: {original_answer}
Context: {context}
Feedback: {feedback}
Improved Answer:
"""
def evaluate_answer(self, question: str, answer: str, context: str) -> Dict[str, Any]:
"""Evaluate the quality of an answer"""
prompt = self.reflection_prompt.format(
question=question,
answer=answer,
context=context
)
evaluation = self.llm(prompt)
# Parse score (simple parsing)
lines = evaluation.split('\n')
score = None
feedback = evaluation
for line in lines:
if line.strip().startswith('Score:'):
try:
score = int(line.split(':')[1].strip().split()[0])
except:
score = 5 # Default score
break
return {
'score': score or 5,
'feedback': feedback
}
def refine_answer(self, question: str, original_answer: str,
context: str, feedback: str) -> str:
"""Refine answer based on feedback"""
prompt = self.refinement_prompt.format(
question=question,
original_answer=original_answer,
context=context,
feedback=feedback
)
refined_answer = self.llm(prompt)
return refined_answer.strip()
def query(self, question: str, max_iterations: int = 2,
min_score: int = 7) -> Dict[str, Any]:
"""Query with self-reflection and refinement"""
# Initial retrieval and generation
relevant_docs = self.vector_store.similarity_search(question, k=5)
context = "\n\n".join([doc.page_content for doc in relevant_docs])
# Initial answer
initial_prompt = f"""
Context: {context}
Question: {question}
Answer:
"""
current_answer = self.llm(initial_prompt).strip()
iterations = []
for iteration in range(max_iterations):
# Evaluate current answer
evaluation = self.evaluate_answer(question, current_answer, context)
iterations.append({
'iteration': iteration + 1,
'answer': current_answer,
'score': evaluation['score'],
'feedback': evaluation['feedback']
})
# Check if answer is good enough
if evaluation['score'] >= min_score:
break
# Refine answer
if iteration < max_iterations - 1: # Don't refine on last iteration
current_answer = self.refine_answer(
question, current_answer, context, evaluation['feedback']
)
return {
'question': question,
'final_answer': current_answer,
'iterations': iterations,
'sources': relevant_docs,
'final_score': iterations[-1]['score']
}
# Usage
reflective_rag = SelfReflectiveRAG(vector_store, llm)
result = reflective_rag.query(
"What are the key principles of sustainable development?",
max_iterations=3,
min_score=8
)
RAG Optimization Strategies
1. Chunking Optimization
class AdaptiveChunker:
def __init__(self):
self.strategies = {
'fixed': self._fixed_chunking,
'semantic': self._semantic_chunking,
'sliding_window': self._sliding_window_chunking,
'hierarchical': self._hierarchical_chunking
}
def _fixed_chunking(self, text: str, chunk_size: int = 1000,
overlap: int = 200) -> List[str]:
"""Traditional fixed-size chunking"""
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunk = text[start:end]
# Try to break at sentence boundary
if end < len(text):
last_period = chunk.rfind('.')
last_newline = chunk.rfind('\n')
break_point = max(last_period, last_newline)
if break_point > start + chunk_size * 0.5: # At least 50% of chunk
chunk = text[start:start + break_point + 1]
end = start + break_point + 1
chunks.append(chunk.strip())
start = end - overlap
return chunks
def _semantic_chunking(self, text: str, embeddings_model,
similarity_threshold: float = 0.8) -> List[str]:
"""Chunk based on semantic similarity"""
sentences = text.split('.')
if len(sentences) < 2:
return [text]
# Get embeddings for sentences
sentence_embeddings = embeddings_model.embed_documents(sentences)
chunks = []
current_chunk = [sentences[0]]
for i in range(1, len(sentences)):
# Calculate similarity with current chunk
chunk_embedding = np.mean([
sentence_embeddings[j] for j in range(len(current_chunk))
], axis=0)
sentence_embedding = sentence_embeddings[i]
similarity = np.dot(chunk_embedding, sentence_embedding) / (
np.linalg.norm(chunk_embedding) * np.linalg.norm(sentence_embedding)
)
if similarity >= similarity_threshold:
current_chunk.append(sentences[i])
else:
# Start new chunk
chunks.append('.'.join(current_chunk) + '.')
current_chunk = [sentences[i]]
# Add final chunk
if current_chunk:
chunks.append('.'.join(current_chunk) + '.')
return chunks
def _sliding_window_chunking(self, text: str, window_size: int = 1000,
step_size: int = 500) -> List[str]:
"""Sliding window chunking with overlap"""
chunks = []
start = 0
while start < len(text):
end = min(start + window_size, len(text))
chunk = text[start:end]
chunks.append(chunk.strip())
if end == len(text):
break
start += step_size
return chunks
def _hierarchical_chunking(self, text: str) -> Dict[str, List[str]]:
"""Create hierarchical chunks at different levels"""
# Level 1: Paragraphs
paragraphs = text.split('\n\n')
# Level 2: Sentences within paragraphs
sentences = []
for para in paragraphs:
para_sentences = para.split('.')
sentences.extend([s.strip() + '.' for s in para_sentences if s.strip()])
# Level 3: Fixed-size chunks
fixed_chunks = self._fixed_chunking(text, chunk_size=500, overlap=100)
return {
'paragraphs': paragraphs,
'sentences': sentences,
'fixed_chunks': fixed_chunks
}
def optimize_chunking(self, documents: List[str],
test_queries: List[str],
embeddings_model) -> Dict[str, Any]:
"""Find optimal chunking strategy for given documents and queries"""
results = {}
for strategy_name, strategy_func in self.strategies.items():
if strategy_name == 'hierarchical':
continue # Skip for this optimization
print(f"Testing {strategy_name} chunking...")
# Apply chunking strategy
all_chunks = []
for doc in documents:
if strategy_name == 'semantic':
chunks = strategy_func(doc, embeddings_model)
else:
chunks = strategy_func(doc)
all_chunks.extend(chunks)
# Create temporary vector store
temp_embeddings = embeddings_model.embed_documents(all_chunks)
# Test retrieval quality
retrieval_scores = []
for query in test_queries:
query_embedding = embeddings_model.embed_query(query)
# Calculate similarities
similarities = [
np.dot(query_embedding, chunk_emb) / (
np.linalg.norm(query_embedding) * np.linalg.norm(chunk_emb)
)
for chunk_emb in temp_embeddings
]
# Get top-5 average similarity
top_similarities = sorted(similarities, reverse=True)[:5]
avg_similarity = np.mean(top_similarities)
retrieval_scores.append(avg_similarity)
results[strategy_name] = {
'avg_retrieval_score': np.mean(retrieval_scores),
'num_chunks': len(all_chunks),
'avg_chunk_length': np.mean([len(chunk) for chunk in all_chunks])
}
# Find best strategy
best_strategy = max(results.keys(),
key=lambda x: results[x]['avg_retrieval_score'])
return {
'results': results,
'best_strategy': best_strategy,
'best_score': results[best_strategy]['avg_retrieval_score']
}
# Usage
chunker = AdaptiveChunker()
optimization_results = chunker.optimize_chunking(
documents=sample_documents,
test_queries=sample_queries,
embeddings_model=embeddings_model
)
2. Query Enhancement
class QueryEnhancer:
def __init__(self, llm):
self.llm = llm
self.expansion_prompt = """
Expand the following query with related terms and concepts that would help find relevant information:
Original Query: {query}
Expanded Query (include synonyms, related terms, and context):
"""
self.reformulation_prompt = """
Reformulate the following query in different ways to capture various aspects:
Original Query: {query}
Alternative formulations:
1.
2.
3.
"""
def expand_query(self, query: str) -> str:
"""Expand query with related terms"""
prompt = self.expansion_prompt.format(query=query)
expanded = self.llm(prompt)
return expanded.strip()
def reformulate_query(self, query: str) -> List[str]:
"""Generate alternative query formulations"""
prompt = self.reformulation_prompt.format(query=query)
response = self.llm(prompt)
# Parse reformulations
lines = response.strip().split('\n')
reformulations = []
for line in lines:
line = line.strip()
if line and (line.startswith(('1.', '2.', '3.', '-', '•'))):
reformulation = line.split('.', 1)[-1].strip()
if reformulation:
reformulations.append(reformulation)
return reformulations
def enhance_query(self, query: str, strategy: str = 'expansion') -> List[str]:
"""Enhance query using specified strategy"""
if strategy == 'expansion':
enhanced = self.expand_query(query)
return [query, enhanced]
elif strategy == 'reformulation':
reformulations = self.reformulate_query(query)
return [query] + reformulations
elif strategy == 'both':
expanded = self.expand_query(query)
reformulations = self.reformulate_query(query)
return [query, expanded] + reformulations
else:
return [query]
# Usage
query_enhancer = QueryEnhancer(llm)
enhanced_queries = query_enhancer.enhance_query(
"machine learning in healthcare",
strategy='both'
)
RAG Evaluation and Monitoring
1. RAG-Specific Metrics
class RAGEvaluator:
def __init__(self, llm):
self.llm = llm
self.faithfulness_prompt = """
Evaluate if the answer is faithful to the given context. The answer should not contain information that contradicts or goes beyond what's stated in the context.
Context: {context}
Answer: {answer}
Is the answer faithful to the context? (Yes/No)
Explanation:
"""
self.relevance_prompt = """
Evaluate how relevant the retrieved context is to answering the given question.
Question: {question}
Context: {context}
Relevance score (1-5, where 5 is highly relevant):
Explanation:
"""
def evaluate_faithfulness(self, context: str, answer: str) -> Dict[str, Any]:
"""Evaluate if answer is faithful to context"""
prompt = self.faithfulness_prompt.format(context=context, answer=answer)
response = self.llm(prompt)
# Parse response
is_faithful = 'yes' in response.lower().split('\n')[0]
return {
'is_faithful': is_faithful,
'explanation': response
}
def evaluate_context_relevance(self, question: str, context: str) -> Dict[str, Any]:
"""Evaluate relevance of retrieved context"""
prompt = self.relevance_prompt.format(question=question, context=context)
response = self.llm(prompt)
# Parse score
lines = response.split('\n')
score = 3 # Default
for line in lines:
if 'score' in line.lower():
try:
score = int(line.split(':')[1].strip().split()[0])
except:
pass
break
return {
'relevance_score': score,
'explanation': response
}
def evaluate_rag_response(self, question: str, answer: str,
context: str, ground_truth: str = None) -> Dict[str, Any]:
"""Comprehensive RAG evaluation"""
results = {
'question': question,
'answer': answer,
'context': context
}
# Faithfulness
faithfulness = self.evaluate_faithfulness(context, answer)
results['faithfulness'] = faithfulness
# Context relevance
relevance = self.evaluate_context_relevance(question, context)
results['context_relevance'] = relevance
# Answer relevance (how well answer addresses question)
answer_relevance = self.evaluate_context_relevance(question, answer)
results['answer_relevance'] = answer_relevance
# If ground truth available, calculate similarity
if ground_truth:
# Use BERTScore or similar
from bert_score import score
_, _, f1 = score([answer], [ground_truth], lang='en')
results['similarity_to_ground_truth'] = f1.item()
# Overall score
scores = [
1.0 if faithfulness['is_faithful'] else 0.0,
relevance['relevance_score'] / 5.0,
answer_relevance['relevance_score'] / 5.0
]
if ground_truth:
scores.append(results['similarity_to_ground_truth'])
results['overall_score'] = np.mean(scores)
return results
def batch_evaluate(self, test_cases: List[Dict]) -> Dict[str, Any]:
"""Evaluate multiple RAG responses"""
all_results = []
for case in test_cases:
result = self.evaluate_rag_response(
question=case['question'],
answer=case['answer'],
context=case['context'],
ground_truth=case.get('ground_truth')
)
all_results.append(result)
# Aggregate metrics
faithfulness_scores = [
1.0 if r['faithfulness']['is_faithful'] else 0.0
for r in all_results
]
context_relevance_scores = [
r['context_relevance']['relevance_score'] / 5.0
for r in all_results
]
answer_relevance_scores = [
r['answer_relevance']['relevance_score'] / 5.0
for r in all_results
]
overall_scores = [r['overall_score'] for r in all_results]
summary = {
'num_cases': len(test_cases),
'avg_faithfulness': np.mean(faithfulness_scores),
'avg_context_relevance': np.mean(context_relevance_scores),
'avg_answer_relevance': np.mean(answer_relevance_scores),
'avg_overall_score': np.mean(overall_scores),
'detailed_results': all_results
}
return summary
# Usage
evaluator = RAGEvaluator(llm)
test_cases = [
{
'question': 'What is machine learning?',
'answer': 'Machine learning is a subset of AI...',
'context': 'Machine learning algorithms learn patterns...',
'ground_truth': 'Machine learning is a method of data analysis...'
}
]
evaluation_results = evaluator.batch_evaluate(test_cases)
Production Deployment
1. RAG API Service
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
import uvicorn
app = FastAPI(title="RAG API Service")
class QueryRequest(BaseModel):
question: str
k: int = 5
score_threshold: Optional[float] = None
class QueryResponse(BaseModel):
question: str
answer: str
sources: List[Dict[str, Any]]
confidence_score: float
processing_time: float
class RAGService:
def __init__(self):
# Initialize components
self.vector_store = None
self.rag_pipeline = None
self.load_models()
def load_models(self):
"""Load RAG components"""
# Load vector store
vector_manager = VectorStoreManager()
self.vector_store = vector_manager.load_vector_store("./vector_db")
# Initialize RAG pipeline
self.rag_pipeline = RAGPipeline(self.vector_store)
def process_query(self, request: QueryRequest) -> QueryResponse:
"""Process a single query"""
import time
start_time = time.time()
try:
# Process query
result = self.rag_pipeline.query(
request.question,
k=request.k
)
processing_time = time.time() - start_time
# Calculate confidence score (simplified)
confidence_score = min(result['num_sources'] / request.k, 1.0)
return QueryResponse(
question=result['question'],
answer=result['answer'],
sources=result['source_documents'],
confidence_score=confidence_score,
processing_time=processing_time
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Initialize service
rag_service = RAGService()
@app.post("/query", response_model=QueryResponse)
async def query_endpoint(request: QueryRequest):
"""Query the RAG system"""
return rag_service.process_query(request)
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
2. Monitoring and Logging
import logging
from datetime import datetime
import json
class RAGMonitor:
def __init__(self, log_file="rag_monitor.log"):
self.logger = logging.getLogger("RAGMonitor")
self.logger.setLevel(logging.INFO)
handler = logging.FileHandler(log_file)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.metrics = {
'total_queries': 0,
'avg_response_time': 0,
'avg_confidence': 0,
'error_count': 0
}
def log_query(self, request: QueryRequest, response: QueryResponse,
error: str = None):
"""Log query details"""
log_data = {
'timestamp': datetime.now().isoformat(),
'question': request.question,
'k': request.k,
'processing_time': response.processing_time if response else None,
'confidence_score': response.confidence_score if response else None,
'num_sources': len(response.sources) if response else 0,
'error': error
}
if error:
self.logger.error(f"Query failed: {json.dumps(log_data)}")
self.metrics['error_count'] += 1
else:
self.logger.info(f"Query processed: {json.dumps(log_data)}")
# Update metrics
self.metrics['total_queries'] += 1
# Update running averages
n = self.metrics['total_queries']
self.metrics['avg_response_time'] = (
(self.metrics['avg_response_time'] * (n-1) + response.processing_time) / n
)
self.metrics['avg_confidence'] = (
(self.metrics['avg_confidence'] * (n-1) + response.confidence_score) / n
)
def get_metrics(self) -> Dict[str, Any]:
"""Get current metrics"""
return self.metrics.copy()
def reset_metrics(self):
"""Reset metrics"""
self.metrics = {
'total_queries': 0,
'avg_response_time': 0,
'avg_confidence': 0,
'error_count': 0
}
# Usage in API
monitor = RAGMonitor()
@app.post("/query", response_model=QueryResponse)
async def query_endpoint(request: QueryRequest):
"""Query the RAG system with monitoring"""
try:
response = rag_service.process_query(request)
monitor.log_query(request, response)
return response
except Exception as e:
monitor.log_query(request, None, str(e))
raise HTTPException(status_code=500, detail=str(e))
@app.get("/metrics")
async def get_metrics():
"""Get system metrics"""
return monitor.get_metrics()
Best Practices and Common Pitfalls
Best Practices
-
Document Quality
- Clean and preprocess documents
- Remove noise and irrelevant content
- Maintain consistent formatting
-
Chunking Strategy
- Test different chunking approaches
- Consider document structure
- Balance chunk size and overlap
-
Retrieval Optimization
- Use hybrid search when possible
- Implement query enhancement
- Monitor retrieval quality
-
Generation Quality
- Use appropriate prompts
- Implement safety filters
- Validate generated content
Common Pitfalls
-
Poor Chunking
- Breaking sentences or concepts
- Too large or too small chunks
- Inconsistent chunk boundaries
-
Retrieval Issues
- Low-quality embeddings
- Insufficient context
- Irrelevant retrieved documents
-
Generation Problems
- Hallucination despite context
- Ignoring retrieved information
- Inconsistent response quality
Conclusion
RAG represents a powerful paradigm for enhancing LLMs with external knowledge. Success depends on careful attention to each component:
- Document Processing: Quality chunking and indexing
- Retrieval: Effective search and ranking
- Generation: Faithful and relevant responses
- Evaluation: Comprehensive quality assessment
The field continues to evolve with new techniques for improving retrieval accuracy, generation quality, and system efficiency. Stay updated with the latest research and best practices to build effective RAG systems.