Retrieval-Augmented Generation (RAG): Enhancing LLMs with External Knowledge

Retrieval-Augmented Generation (RAG) is a powerful technique that combines the generative capabilities of language models with external knowledge retrieval, enabling more accurate, up-to-date, and factual responses.

Understanding RAG Architecture

Core Components

  1. Retriever: Finds relevant documents/passages
  2. Generator: Produces responses based on retrieved context
  3. Knowledge Base: External source of information
  4. Encoder: Converts text to vector representations

RAG Workflow

graph LR
    A[User Query] --> B[Query Encoding]
    B --> C[Document Retrieval]
    C --> D[Context Selection]
    D --> E[Prompt Construction]
    E --> F[LLM Generation]
    F --> G[Response]

    H[Document Corpus] --> I[Document Processing]
    I --> J[Chunking]
    J --> K[Embedding]
    K --> L[Vector Store]
    L --> C

Building a Basic RAG System

1. Document Processing and Indexing

import os
from typing import List, Dict, Any
from langchain.document_loaders import PyPDFLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
import chromadb

class DocumentProcessor:
    def __init__(self, chunk_size=1000, chunk_overlap=200):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,
        )

    def load_documents(self, file_paths: List[str]) -> List[Dict]:
        """Load documents from various file formats"""
        documents = []

        for file_path in file_paths:
            try:
                if file_path.endswith('.pdf'):
                    loader = PyPDFLoader(file_path)
                elif file_path.endswith('.txt'):
                    loader = TextLoader(file_path)
                else:
                    print(f"Unsupported file format: {file_path}")
                    continue

                docs = loader.load()
                for doc in docs:
                    doc.metadata['source'] = file_path
                documents.extend(docs)

            except Exception as e:
                print(f"Error loading {file_path}: {e}")

        return documents

    def chunk_documents(self, documents: List[Dict]) -> List[Dict]:
        """Split documents into chunks"""
        chunks = []

        for doc in documents:
            doc_chunks = self.text_splitter.split_documents([doc])

            # Add chunk metadata
            for i, chunk in enumerate(doc_chunks):
                chunk.metadata.update({
                    'chunk_id': f"{doc.metadata.get('source', 'unknown')}_{i}",
                    'chunk_index': i,
                    'total_chunks': len(doc_chunks)
                })

            chunks.extend(doc_chunks)

        return chunks

    def process_documents(self, file_paths: List[str]) -> List[Dict]:
        """Complete document processing pipeline"""
        print("Loading documents...")
        documents = self.load_documents(file_paths)

        print(f"Loaded {len(documents)} documents")
        print("Chunking documents...")
        chunks = self.chunk_documents(documents)

        print(f"Created {len(chunks)} chunks")
        return chunks

# Usage
processor = DocumentProcessor(chunk_size=1000, chunk_overlap=200)
chunks = processor.process_documents([
    "document1.pdf",
    "document2.txt",
    "document3.pdf"
])

2. Vector Store Creation

from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
import pickle

class VectorStoreManager:
    def __init__(self, embedding_model="sentence-transformers/all-MiniLM-L6-v2"):
        self.embeddings = HuggingFaceEmbeddings(
            model_name=embedding_model,
            model_kwargs={'device': 'cpu'},
            encode_kwargs={'normalize_embeddings': True}
        )
        self.vector_store = None

    def create_vector_store(self, chunks: List[Dict], persist_directory=None):
        """Create vector store from document chunks"""
        print("Creating embeddings...")

        # Extract text and metadata
        texts = [chunk.page_content for chunk in chunks]
        metadatas = [chunk.metadata for chunk in chunks]

        # Create vector store
        if persist_directory:
            self.vector_store = Chroma.from_texts(
                texts=texts,
                metadatas=metadatas,
                embedding=self.embeddings,
                persist_directory=persist_directory
            )
            self.vector_store.persist()
        else:
            self.vector_store = FAISS.from_texts(
                texts=texts,
                metadatas=metadatas,
                embedding=self.embeddings
            )

        print(f"Vector store created with {len(texts)} documents")
        return self.vector_store

    def load_vector_store(self, persist_directory):
        """Load existing vector store"""
        self.vector_store = Chroma(
            persist_directory=persist_directory,
            embedding_function=self.embeddings
        )
        return self.vector_store

    def add_documents(self, chunks: List[Dict]):
        """Add new documents to existing vector store"""
        if not self.vector_store:
            raise ValueError("Vector store not initialized")

        texts = [chunk.page_content for chunk in chunks]
        metadatas = [chunk.metadata for chunk in chunks]

        self.vector_store.add_texts(texts=texts, metadatas=metadatas)

        if hasattr(self.vector_store, 'persist'):
            self.vector_store.persist()

    def search(self, query: str, k: int = 5, score_threshold: float = None):
        """Search for relevant documents"""
        if not self.vector_store:
            raise ValueError("Vector store not initialized")

        if score_threshold:
            results = self.vector_store.similarity_search_with_score(
                query, k=k
            )
            # Filter by score threshold
            filtered_results = [
                (doc, score) for doc, score in results
                if score >= score_threshold
            ]
            return [doc for doc, score in filtered_results]
        else:
            return self.vector_store.similarity_search(query, k=k)

# Usage
vector_manager = VectorStoreManager()
vector_store = vector_manager.create_vector_store(
    chunks,
    persist_directory="./vector_db"
)

3. RAG Pipeline Implementation

from langchain.llms import OpenAI
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
import openai

class RAGPipeline:
    def __init__(self, vector_store, llm_model="gpt-3.5-turbo", temperature=0.1):
        self.vector_store = vector_store
        self.llm = OpenAI(model_name=llm_model, temperature=temperature)
        self.retriever = vector_store.as_retriever()

        # Custom prompt template
        self.prompt_template = PromptTemplate(
            template="""Use the following pieces of context to answer the question at the end.
            If you don't know the answer based on the context, just say that you don't know,
            don't try to make up an answer.

            Context:
            {context}

            Question: {question}

            Answer: """,
            input_variables=["context", "question"]
        )

        # Create QA chain
        self.qa_chain = RetrievalQA.from_chain_type(
            llm=self.llm,
            chain_type="stuff",
            retriever=self.retriever,
            chain_type_kwargs={"prompt": self.prompt_template},
            return_source_documents=True
        )

    def query(self, question: str, k: int = 5) -> Dict[str, Any]:
        """Process a query through the RAG pipeline"""

        # Update retriever parameters
        self.retriever.search_kwargs = {"k": k}

        # Get response
        result = self.qa_chain({"query": question})

        # Format response
        response = {
            "question": question,
            "answer": result["result"],
            "source_documents": [
                {
                    "content": doc.page_content,
                    "metadata": doc.metadata,
                    "source": doc.metadata.get("source", "unknown")
                }
                for doc in result["source_documents"]
            ],
            "num_sources": len(result["source_documents"])
        }

        return response

    def batch_query(self, questions: List[str]) -> List[Dict[str, Any]]:
        """Process multiple queries"""
        results = []
        for question in questions:
            result = self.query(question)
            results.append(result)
        return results

    def update_prompt(self, new_template: str):
        """Update the prompt template"""
        self.prompt_template = PromptTemplate(
            template=new_template,
            input_variables=["context", "question"]
        )

        # Recreate QA chain with new prompt
        self.qa_chain = RetrievalQA.from_chain_type(
            llm=self.llm,
            chain_type="stuff",
            retriever=self.retriever,
            chain_type_kwargs={"prompt": self.prompt_template},
            return_source_documents=True
        )

# Usage
rag_pipeline = RAGPipeline(vector_store)
response = rag_pipeline.query("What are the main benefits of machine learning?")
print(f"Answer: {response['answer']}")
print(f"Sources: {response['num_sources']}")

Advanced RAG Techniques

1. Hybrid Search (Dense + Sparse)

from rank_bm25 import BM25Okapi
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer

class HybridRetriever:
    def __init__(self, documents, embeddings_model, alpha=0.5):
        self.documents = documents
        self.embeddings_model = embeddings_model
        self.alpha = alpha  # Weight for dense vs sparse

        # Prepare texts
        self.texts = [doc.page_content for doc in documents]

        # Initialize BM25 (sparse retrieval)
        tokenized_texts = [text.split() for text in self.texts]
        self.bm25 = BM25Okapi(tokenized_texts)

        # Create dense embeddings
        self.dense_embeddings = self.embeddings_model.embed_documents(self.texts)
        self.dense_embeddings = np.array(self.dense_embeddings)

    def search(self, query: str, k: int = 5) -> List[Dict]:
        """Hybrid search combining dense and sparse retrieval"""

        # Dense retrieval
        query_embedding = self.embeddings_model.embed_query(query)
        query_embedding = np.array(query_embedding)

        # Calculate cosine similarity
        dense_scores = np.dot(self.dense_embeddings, query_embedding) / (
            np.linalg.norm(self.dense_embeddings, axis=1) * np.linalg.norm(query_embedding)
        )

        # Sparse retrieval (BM25)
        query_tokens = query.split()
        sparse_scores = self.bm25.get_scores(query_tokens)

        # Normalize scores
        dense_scores = (dense_scores - dense_scores.min()) / (dense_scores.max() - dense_scores.min())
        sparse_scores = (sparse_scores - sparse_scores.min()) / (sparse_scores.max() - sparse_scores.min())

        # Combine scores
        hybrid_scores = self.alpha * dense_scores + (1 - self.alpha) * sparse_scores

        # Get top-k results
        top_indices = np.argsort(hybrid_scores)[::-1][:k]

        results = []
        for idx in top_indices:
            results.append({
                'document': self.documents[idx],
                'score': hybrid_scores[idx],
                'dense_score': dense_scores[idx],
                'sparse_score': sparse_scores[idx]
            })

        return results

# Usage
hybrid_retriever = HybridRetriever(chunks, vector_manager.embeddings)
results = hybrid_retriever.search("machine learning applications", k=5)

2. Multi-Step Reasoning RAG

class MultiStepRAG:
    def __init__(self, vector_store, llm):
        self.vector_store = vector_store
        self.llm = llm

        # Decomposition prompt
        self.decompose_prompt = """
        Break down this complex question into simpler sub-questions that can be answered independently:

        Question: {question}

        Sub-questions:
        1.
        2.
        3.
        """

        # Synthesis prompt
        self.synthesis_prompt = """
        Based on the following sub-questions and their answers, provide a comprehensive answer to the original question:

        Original Question: {original_question}

        Sub-questions and Answers:
        {sub_qa_pairs}

        Comprehensive Answer:
        """

    def decompose_question(self, question: str) -> List[str]:
        """Break complex question into sub-questions"""
        prompt = self.decompose_prompt.format(question=question)
        response = self.llm(prompt)

        # Parse sub-questions (simple parsing)
        lines = response.strip().split('\n')
        sub_questions = []

        for line in lines:
            line = line.strip()
            if line and (line.startswith(('1.', '2.', '3.', '-', '•'))):
                # Clean up the sub-question
                sub_question = line.split('.', 1)[-1].strip()
                if sub_question:
                    sub_questions.append(sub_question)

        return sub_questions

    def answer_sub_question(self, sub_question: str) -> Dict[str, Any]:
        """Answer a single sub-question using RAG"""
        # Retrieve relevant documents
        relevant_docs = self.vector_store.similarity_search(sub_question, k=3)

        # Create context
        context = "\n\n".join([doc.page_content for doc in relevant_docs])

        # Generate answer
        prompt = f"""
        Context: {context}

        Question: {sub_question}

        Answer based on the context:
        """

        answer = self.llm(prompt)

        return {
            'question': sub_question,
            'answer': answer.strip(),
            'sources': relevant_docs
        }

    def synthesize_answer(self, original_question: str, sub_qa_pairs: List[Dict]) -> str:
        """Synthesize final answer from sub-answers"""

        # Format sub-Q&A pairs
        formatted_pairs = []
        for i, pair in enumerate(sub_qa_pairs, 1):
            formatted_pairs.append(f"Q{i}: {pair['question']}\nA{i}: {pair['answer']}")

        sub_qa_text = "\n\n".join(formatted_pairs)

        # Generate synthesis
        prompt = self.synthesis_prompt.format(
            original_question=original_question,
            sub_qa_pairs=sub_qa_text
        )

        final_answer = self.llm(prompt)
        return final_answer.strip()

    def query(self, question: str) -> Dict[str, Any]:
        """Process complex query with multi-step reasoning"""

        # Step 1: Decompose question
        sub_questions = self.decompose_question(question)

        if not sub_questions:
            # Fallback to simple RAG
            relevant_docs = self.vector_store.similarity_search(question, k=5)
            context = "\n\n".join([doc.page_content for doc in relevant_docs])

            prompt = f"""
            Context: {context}
            Question: {question}
            Answer:
            """

            answer = self.llm(prompt)

            return {
                'question': question,
                'answer': answer.strip(),
                'method': 'simple_rag',
                'sources': relevant_docs
            }

        # Step 2: Answer sub-questions
        sub_answers = []
        all_sources = []

        for sub_q in sub_questions:
            sub_result = self.answer_sub_question(sub_q)
            sub_answers.append(sub_result)
            all_sources.extend(sub_result['sources'])

        # Step 3: Synthesize final answer
        final_answer = self.synthesize_answer(question, sub_answers)

        return {
            'question': question,
            'answer': final_answer,
            'method': 'multi_step',
            'sub_questions': sub_questions,
            'sub_answers': sub_answers,
            'sources': all_sources
        }

# Usage
multi_step_rag = MultiStepRAG(vector_store, llm)
result = multi_step_rag.query(
    "How do machine learning algorithms impact healthcare outcomes and what are the ethical considerations?"
)

3. Self-Reflective RAG

class SelfReflectiveRAG:
    def __init__(self, vector_store, llm):
        self.vector_store = vector_store
        self.llm = llm

        self.reflection_prompt = """
        Evaluate the following answer to determine if it adequately addresses the question:

        Question: {question}
        Answer: {answer}
        Retrieved Context: {context}

        Evaluation Criteria:
        1. Completeness: Does the answer fully address all aspects of the question?
        2. Accuracy: Is the answer consistent with the provided context?
        3. Relevance: Is the answer directly relevant to the question?
        4. Clarity: Is the answer clear and well-structured?

        Provide a score from 1-10 and explain what could be improved:

        Score:
        Improvements needed:
        """

        self.refinement_prompt = """
        Based on the evaluation feedback, improve the following answer:

        Original Question: {question}
        Original Answer: {original_answer}
        Context: {context}
        Feedback: {feedback}

        Improved Answer:
        """

    def evaluate_answer(self, question: str, answer: str, context: str) -> Dict[str, Any]:
        """Evaluate the quality of an answer"""

        prompt = self.reflection_prompt.format(
            question=question,
            answer=answer,
            context=context
        )

        evaluation = self.llm(prompt)

        # Parse score (simple parsing)
        lines = evaluation.split('\n')
        score = None
        feedback = evaluation

        for line in lines:
            if line.strip().startswith('Score:'):
                try:
                    score = int(line.split(':')[1].strip().split()[0])
                except:
                    score = 5  # Default score
                break

        return {
            'score': score or 5,
            'feedback': feedback
        }

    def refine_answer(self, question: str, original_answer: str,
                     context: str, feedback: str) -> str:
        """Refine answer based on feedback"""

        prompt = self.refinement_prompt.format(
            question=question,
            original_answer=original_answer,
            context=context,
            feedback=feedback
        )

        refined_answer = self.llm(prompt)
        return refined_answer.strip()

    def query(self, question: str, max_iterations: int = 2,
             min_score: int = 7) -> Dict[str, Any]:
        """Query with self-reflection and refinement"""

        # Initial retrieval and generation
        relevant_docs = self.vector_store.similarity_search(question, k=5)
        context = "\n\n".join([doc.page_content for doc in relevant_docs])

        # Initial answer
        initial_prompt = f"""
        Context: {context}
        Question: {question}
        Answer:
        """

        current_answer = self.llm(initial_prompt).strip()
        iterations = []

        for iteration in range(max_iterations):
            # Evaluate current answer
            evaluation = self.evaluate_answer(question, current_answer, context)

            iterations.append({
                'iteration': iteration + 1,
                'answer': current_answer,
                'score': evaluation['score'],
                'feedback': evaluation['feedback']
            })

            # Check if answer is good enough
            if evaluation['score'] >= min_score:
                break

            # Refine answer
            if iteration < max_iterations - 1:  # Don't refine on last iteration
                current_answer = self.refine_answer(
                    question, current_answer, context, evaluation['feedback']
                )

        return {
            'question': question,
            'final_answer': current_answer,
            'iterations': iterations,
            'sources': relevant_docs,
            'final_score': iterations[-1]['score']
        }

# Usage
reflective_rag = SelfReflectiveRAG(vector_store, llm)
result = reflective_rag.query(
    "What are the key principles of sustainable development?",
    max_iterations=3,
    min_score=8
)

RAG Optimization Strategies

1. Chunking Optimization

class AdaptiveChunker:
    def __init__(self):
        self.strategies = {
            'fixed': self._fixed_chunking,
            'semantic': self._semantic_chunking,
            'sliding_window': self._sliding_window_chunking,
            'hierarchical': self._hierarchical_chunking
        }

    def _fixed_chunking(self, text: str, chunk_size: int = 1000,
                       overlap: int = 200) -> List[str]:
        """Traditional fixed-size chunking"""
        chunks = []
        start = 0

        while start < len(text):
            end = start + chunk_size
            chunk = text[start:end]

            # Try to break at sentence boundary
            if end < len(text):
                last_period = chunk.rfind('.')
                last_newline = chunk.rfind('\n')
                break_point = max(last_period, last_newline)

                if break_point > start + chunk_size * 0.5:  # At least 50% of chunk
                    chunk = text[start:start + break_point + 1]
                    end = start + break_point + 1

            chunks.append(chunk.strip())
            start = end - overlap

        return chunks

    def _semantic_chunking(self, text: str, embeddings_model,
                          similarity_threshold: float = 0.8) -> List[str]:
        """Chunk based on semantic similarity"""
        sentences = text.split('.')
        if len(sentences) < 2:
            return [text]

        # Get embeddings for sentences
        sentence_embeddings = embeddings_model.embed_documents(sentences)

        chunks = []
        current_chunk = [sentences[0]]

        for i in range(1, len(sentences)):
            # Calculate similarity with current chunk
            chunk_embedding = np.mean([
                sentence_embeddings[j] for j in range(len(current_chunk))
            ], axis=0)

            sentence_embedding = sentence_embeddings[i]
            similarity = np.dot(chunk_embedding, sentence_embedding) / (
                np.linalg.norm(chunk_embedding) * np.linalg.norm(sentence_embedding)
            )

            if similarity >= similarity_threshold:
                current_chunk.append(sentences[i])
            else:
                # Start new chunk
                chunks.append('.'.join(current_chunk) + '.')
                current_chunk = [sentences[i]]

        # Add final chunk
        if current_chunk:
            chunks.append('.'.join(current_chunk) + '.')

        return chunks

    def _sliding_window_chunking(self, text: str, window_size: int = 1000,
                               step_size: int = 500) -> List[str]:
        """Sliding window chunking with overlap"""
        chunks = []
        start = 0

        while start < len(text):
            end = min(start + window_size, len(text))
            chunk = text[start:end]
            chunks.append(chunk.strip())

            if end == len(text):
                break

            start += step_size

        return chunks

    def _hierarchical_chunking(self, text: str) -> Dict[str, List[str]]:
        """Create hierarchical chunks at different levels"""

        # Level 1: Paragraphs
        paragraphs = text.split('\n\n')

        # Level 2: Sentences within paragraphs
        sentences = []
        for para in paragraphs:
            para_sentences = para.split('.')
            sentences.extend([s.strip() + '.' for s in para_sentences if s.strip()])

        # Level 3: Fixed-size chunks
        fixed_chunks = self._fixed_chunking(text, chunk_size=500, overlap=100)

        return {
            'paragraphs': paragraphs,
            'sentences': sentences,
            'fixed_chunks': fixed_chunks
        }

    def optimize_chunking(self, documents: List[str],
                         test_queries: List[str],
                         embeddings_model) -> Dict[str, Any]:
        """Find optimal chunking strategy for given documents and queries"""

        results = {}

        for strategy_name, strategy_func in self.strategies.items():
            if strategy_name == 'hierarchical':
                continue  # Skip for this optimization

            print(f"Testing {strategy_name} chunking...")

            # Apply chunking strategy
            all_chunks = []
            for doc in documents:
                if strategy_name == 'semantic':
                    chunks = strategy_func(doc, embeddings_model)
                else:
                    chunks = strategy_func(doc)
                all_chunks.extend(chunks)

            # Create temporary vector store
            temp_embeddings = embeddings_model.embed_documents(all_chunks)

            # Test retrieval quality
            retrieval_scores = []
            for query in test_queries:
                query_embedding = embeddings_model.embed_query(query)

                # Calculate similarities
                similarities = [
                    np.dot(query_embedding, chunk_emb) / (
                        np.linalg.norm(query_embedding) * np.linalg.norm(chunk_emb)
                    )
                    for chunk_emb in temp_embeddings
                ]

                # Get top-5 average similarity
                top_similarities = sorted(similarities, reverse=True)[:5]
                avg_similarity = np.mean(top_similarities)
                retrieval_scores.append(avg_similarity)

            results[strategy_name] = {
                'avg_retrieval_score': np.mean(retrieval_scores),
                'num_chunks': len(all_chunks),
                'avg_chunk_length': np.mean([len(chunk) for chunk in all_chunks])
            }

        # Find best strategy
        best_strategy = max(results.keys(),
                          key=lambda x: results[x]['avg_retrieval_score'])

        return {
            'results': results,
            'best_strategy': best_strategy,
            'best_score': results[best_strategy]['avg_retrieval_score']
        }

# Usage
chunker = AdaptiveChunker()
optimization_results = chunker.optimize_chunking(
    documents=sample_documents,
    test_queries=sample_queries,
    embeddings_model=embeddings_model
)

2. Query Enhancement

class QueryEnhancer:
    def __init__(self, llm):
        self.llm = llm

        self.expansion_prompt = """
        Expand the following query with related terms and concepts that would help find relevant information:

        Original Query: {query}

        Expanded Query (include synonyms, related terms, and context):
        """

        self.reformulation_prompt = """
        Reformulate the following query in different ways to capture various aspects:

        Original Query: {query}

        Alternative formulations:
        1.
        2.
        3.
        """

    def expand_query(self, query: str) -> str:
        """Expand query with related terms"""
        prompt = self.expansion_prompt.format(query=query)
        expanded = self.llm(prompt)
        return expanded.strip()

    def reformulate_query(self, query: str) -> List[str]:
        """Generate alternative query formulations"""
        prompt = self.reformulation_prompt.format(query=query)
        response = self.llm(prompt)

        # Parse reformulations
        lines = response.strip().split('\n')
        reformulations = []

        for line in lines:
            line = line.strip()
            if line and (line.startswith(('1.', '2.', '3.', '-', '•'))):
                reformulation = line.split('.', 1)[-1].strip()
                if reformulation:
                    reformulations.append(reformulation)

        return reformulations

    def enhance_query(self, query: str, strategy: str = 'expansion') -> List[str]:
        """Enhance query using specified strategy"""

        if strategy == 'expansion':
            enhanced = self.expand_query(query)
            return [query, enhanced]

        elif strategy == 'reformulation':
            reformulations = self.reformulate_query(query)
            return [query] + reformulations

        elif strategy == 'both':
            expanded = self.expand_query(query)
            reformulations = self.reformulate_query(query)
            return [query, expanded] + reformulations

        else:
            return [query]

# Usage
query_enhancer = QueryEnhancer(llm)
enhanced_queries = query_enhancer.enhance_query(
    "machine learning in healthcare",
    strategy='both'
)

RAG Evaluation and Monitoring

1. RAG-Specific Metrics

class RAGEvaluator:
    def __init__(self, llm):
        self.llm = llm

        self.faithfulness_prompt = """
        Evaluate if the answer is faithful to the given context. The answer should not contain information that contradicts or goes beyond what's stated in the context.

        Context: {context}
        Answer: {answer}

        Is the answer faithful to the context? (Yes/No)
        Explanation:
        """

        self.relevance_prompt = """
        Evaluate how relevant the retrieved context is to answering the given question.

        Question: {question}
        Context: {context}

        Relevance score (1-5, where 5 is highly relevant):
        Explanation:
        """

    def evaluate_faithfulness(self, context: str, answer: str) -> Dict[str, Any]:
        """Evaluate if answer is faithful to context"""
        prompt = self.faithfulness_prompt.format(context=context, answer=answer)
        response = self.llm(prompt)

        # Parse response
        is_faithful = 'yes' in response.lower().split('\n')[0]

        return {
            'is_faithful': is_faithful,
            'explanation': response
        }

    def evaluate_context_relevance(self, question: str, context: str) -> Dict[str, Any]:
        """Evaluate relevance of retrieved context"""
        prompt = self.relevance_prompt.format(question=question, context=context)
        response = self.llm(prompt)

        # Parse score
        lines = response.split('\n')
        score = 3  # Default

        for line in lines:
            if 'score' in line.lower():
                try:
                    score = int(line.split(':')[1].strip().split()[0])
                except:
                    pass
                break

        return {
            'relevance_score': score,
            'explanation': response
        }

    def evaluate_rag_response(self, question: str, answer: str,
                            context: str, ground_truth: str = None) -> Dict[str, Any]:
        """Comprehensive RAG evaluation"""

        results = {
            'question': question,
            'answer': answer,
            'context': context
        }

        # Faithfulness
        faithfulness = self.evaluate_faithfulness(context, answer)
        results['faithfulness'] = faithfulness

        # Context relevance
        relevance = self.evaluate_context_relevance(question, context)
        results['context_relevance'] = relevance

        # Answer relevance (how well answer addresses question)
        answer_relevance = self.evaluate_context_relevance(question, answer)
        results['answer_relevance'] = answer_relevance

        # If ground truth available, calculate similarity
        if ground_truth:
            # Use BERTScore or similar
            from bert_score import score
            _, _, f1 = score([answer], [ground_truth], lang='en')
            results['similarity_to_ground_truth'] = f1.item()

        # Overall score
        scores = [
            1.0 if faithfulness['is_faithful'] else 0.0,
            relevance['relevance_score'] / 5.0,
            answer_relevance['relevance_score'] / 5.0
        ]

        if ground_truth:
            scores.append(results['similarity_to_ground_truth'])

        results['overall_score'] = np.mean(scores)

        return results

    def batch_evaluate(self, test_cases: List[Dict]) -> Dict[str, Any]:
        """Evaluate multiple RAG responses"""

        all_results = []

        for case in test_cases:
            result = self.evaluate_rag_response(
                question=case['question'],
                answer=case['answer'],
                context=case['context'],
                ground_truth=case.get('ground_truth')
            )
            all_results.append(result)

        # Aggregate metrics
        faithfulness_scores = [
            1.0 if r['faithfulness']['is_faithful'] else 0.0
            for r in all_results
        ]

        context_relevance_scores = [
            r['context_relevance']['relevance_score'] / 5.0
            for r in all_results
        ]

        answer_relevance_scores = [
            r['answer_relevance']['relevance_score'] / 5.0
            for r in all_results
        ]

        overall_scores = [r['overall_score'] for r in all_results]

        summary = {
            'num_cases': len(test_cases),
            'avg_faithfulness': np.mean(faithfulness_scores),
            'avg_context_relevance': np.mean(context_relevance_scores),
            'avg_answer_relevance': np.mean(answer_relevance_scores),
            'avg_overall_score': np.mean(overall_scores),
            'detailed_results': all_results
        }

        return summary

# Usage
evaluator = RAGEvaluator(llm)
test_cases = [
    {
        'question': 'What is machine learning?',
        'answer': 'Machine learning is a subset of AI...',
        'context': 'Machine learning algorithms learn patterns...',
        'ground_truth': 'Machine learning is a method of data analysis...'
    }
]

evaluation_results = evaluator.batch_evaluate(test_cases)

Production Deployment

1. RAG API Service

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
import uvicorn

app = FastAPI(title="RAG API Service")

class QueryRequest(BaseModel):
    question: str
    k: int = 5
    score_threshold: Optional[float] = None

class QueryResponse(BaseModel):
    question: str
    answer: str
    sources: List[Dict[str, Any]]
    confidence_score: float
    processing_time: float

class RAGService:
    def __init__(self):
        # Initialize components
        self.vector_store = None
        self.rag_pipeline = None
        self.load_models()

    def load_models(self):
        """Load RAG components"""
        # Load vector store
        vector_manager = VectorStoreManager()
        self.vector_store = vector_manager.load_vector_store("./vector_db")

        # Initialize RAG pipeline
        self.rag_pipeline = RAGPipeline(self.vector_store)

    def process_query(self, request: QueryRequest) -> QueryResponse:
        """Process a single query"""
        import time
        start_time = time.time()

        try:
            # Process query
            result = self.rag_pipeline.query(
                request.question,
                k=request.k
            )

            processing_time = time.time() - start_time

            # Calculate confidence score (simplified)
            confidence_score = min(result['num_sources'] / request.k, 1.0)

            return QueryResponse(
                question=result['question'],
                answer=result['answer'],
                sources=result['source_documents'],
                confidence_score=confidence_score,
                processing_time=processing_time
            )

        except Exception as e:
            raise HTTPException(status_code=500, detail=str(e))

# Initialize service
rag_service = RAGService()

@app.post("/query", response_model=QueryResponse)
async def query_endpoint(request: QueryRequest):
    """Query the RAG system"""
    return rag_service.process_query(request)

@app.get("/health")
async def health_check():
    """Health check endpoint"""
    return {"status": "healthy"}

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

2. Monitoring and Logging

import logging
from datetime import datetime
import json

class RAGMonitor:
    def __init__(self, log_file="rag_monitor.log"):
        self.logger = logging.getLogger("RAGMonitor")
        self.logger.setLevel(logging.INFO)

        handler = logging.FileHandler(log_file)
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)

        self.metrics = {
            'total_queries': 0,
            'avg_response_time': 0,
            'avg_confidence': 0,
            'error_count': 0
        }

    def log_query(self, request: QueryRequest, response: QueryResponse,
                  error: str = None):
        """Log query details"""

        log_data = {
            'timestamp': datetime.now().isoformat(),
            'question': request.question,
            'k': request.k,
            'processing_time': response.processing_time if response else None,
            'confidence_score': response.confidence_score if response else None,
            'num_sources': len(response.sources) if response else 0,
            'error': error
        }

        if error:
            self.logger.error(f"Query failed: {json.dumps(log_data)}")
            self.metrics['error_count'] += 1
        else:
            self.logger.info(f"Query processed: {json.dumps(log_data)}")

            # Update metrics
            self.metrics['total_queries'] += 1

            # Update running averages
            n = self.metrics['total_queries']
            self.metrics['avg_response_time'] = (
                (self.metrics['avg_response_time'] * (n-1) + response.processing_time) / n
            )
            self.metrics['avg_confidence'] = (
                (self.metrics['avg_confidence'] * (n-1) + response.confidence_score) / n
            )

    def get_metrics(self) -> Dict[str, Any]:
        """Get current metrics"""
        return self.metrics.copy()

    def reset_metrics(self):
        """Reset metrics"""
        self.metrics = {
            'total_queries': 0,
            'avg_response_time': 0,
            'avg_confidence': 0,
            'error_count': 0
        }

# Usage in API
monitor = RAGMonitor()

@app.post("/query", response_model=QueryResponse)
async def query_endpoint(request: QueryRequest):
    """Query the RAG system with monitoring"""
    try:
        response = rag_service.process_query(request)
        monitor.log_query(request, response)
        return response
    except Exception as e:
        monitor.log_query(request, None, str(e))
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/metrics")
async def get_metrics():
    """Get system metrics"""
    return monitor.get_metrics()

Best Practices and Common Pitfalls

Best Practices

  1. Document Quality

    • Clean and preprocess documents
    • Remove noise and irrelevant content
    • Maintain consistent formatting
  2. Chunking Strategy

    • Test different chunking approaches
    • Consider document structure
    • Balance chunk size and overlap
  3. Retrieval Optimization

    • Use hybrid search when possible
    • Implement query enhancement
    • Monitor retrieval quality
  4. Generation Quality

    • Use appropriate prompts
    • Implement safety filters
    • Validate generated content

Common Pitfalls

  1. Poor Chunking

    • Breaking sentences or concepts
    • Too large or too small chunks
    • Inconsistent chunk boundaries
  2. Retrieval Issues

    • Low-quality embeddings
    • Insufficient context
    • Irrelevant retrieved documents
  3. Generation Problems

    • Hallucination despite context
    • Ignoring retrieved information
    • Inconsistent response quality

Conclusion

RAG represents a powerful paradigm for enhancing LLMs with external knowledge. Success depends on careful attention to each component:

  • Document Processing: Quality chunking and indexing
  • Retrieval: Effective search and ranking
  • Generation: Faithful and relevant responses
  • Evaluation: Comprehensive quality assessment

The field continues to evolve with new techniques for improving retrieval accuracy, generation quality, and system efficiency. Stay updated with the latest research and best practices to build effective RAG systems.

Share this article: