Building RAG Pipeline
Now comes the exciting part! In the previous tutorials, you've learned all the essential building blocks:
- Tutorial 4.1: Understanding RAG concepts and how it solves the knowledge gap problem
- Tutorial 4.2: Creating text embeddings that capture semantic meaning
- Tutorial 4.3: Storing vectors in PostgreSQL and performing similarity searches with SQL
In this tutorial, we'll combine all these concepts into a complete, working RAG pipeline using PostgreSQL as our vector database. Think of this as assembling all the individual components you've mastered into a powerful, unified system that can answer questions using your own documents.
By the end of this tutorial, you'll have a fully functional RAG system that can ingest documents, create embeddings, store them in PostgreSQL, and answer user questions with relevant context using SQL-based similarity search.
What is a RAG Pipeline?
A RAG pipeline is a complete system that orchestrates all the steps needed to provide intelligent, context-aware responses. It's like building an intelligent research assistant that can quickly read through your documents and provide accurate answers.
Let's break down what "pipeline" means in this context:
graph TD
A[Documents] --> B[Text Processing]
B --> C[Create Embeddings]
C --> D[PostgreSQL Storage]
D --> E[User Query]
E --> F[SQL Similarity Search]
F --> G[Retrieve Context]
G --> H[Generate Answer]
H --> I[Response]
A pipeline is a series of connected steps where the output of one step becomes the input for the next. In RAG, we have two main pipelines:
1. Ingestion Pipeline (Preprocessing)
This runs once when you add new documents:
- Input: Raw documents (text files, PDFs, web pages)
- Process: Split into chunks → Create embeddings → Store in PostgreSQL with pgvector
- Output: Searchable vector database
2. Query Pipeline (Runtime)
This runs every time a user asks a question:
- Input: User question
- Process: Create query embedding → Execute SQL similarity search → Retrieve context → Generate answer
- Output: AI-generated response with relevant context
How RAG Pipelines Work
Let's understand the complete flow with a practical example. Imagine you have documentation about TypeScript and a user asks: "How do I handle async/await errors?"
Step-by-Step Process:
Phase 1: Document Ingestion (Done Once)
- Document Input: "TypeScript async/await error handling involves using try-catch blocks..."
- Text Chunking: Split into manageable pieces (we'll implement this)
- Embedding Creation: Convert chunks to vectors using Gemini
- PostgreSQL Storage: Store embeddings with metadata using SQL INSERT
Phase 2: Query Processing (Per Question)
- Query Input: "How do I handle async/await errors?"
- Query Embedding: Convert question to vector using same model
- SQL Similarity Search: Execute PostgreSQL query with
<=>operator - Context Retrieval: Get original text of top matching chunks
- Answer Generation: Use LLM with retrieved context to generate response
Key Components Integration:
graph TD
A[Document] --> B[Text Splitter]
B --> C[Embedding Model]
C --> D[PostgreSQL + pgvector]
E[User Query] --> F[Embedding Model]
F --> G[SQL Similarity Search]
G --> H[Similar Documents]
H --> I[LLM Generator]
I --> J[Final Answer]
D -.-> G
C -.-> F
The beauty of RAG pipelines is that they combine the best of both worlds:
- Knowledge: From your specific documents (retrieval via PostgreSQL)
- Intelligence: From powerful LLMs (generation)
Building Your First Pipeline
Let's build a complete RAG pipeline step by step using PostgreSQL as our vector database. We'll create a system that can ingest multiple documents and answer questions about them using SQL-based similarity search.
Environment Setup
# Install required dependencies
npm install @google/generative-ai pg @types/pg
npm install @types/node typescript ts-node
# Install PostgreSQL and pgvector (if not already installed)
# On macOS with Homebrew:
brew install postgresql
brew install pgvector
# On Ubuntu/Debian:
sudo apt-get install postgresql postgresql-contrib
sudo apt-get install postgresql-14-pgvector
Make sure your .env file contains both your Google API key and database credentials:
# .env
GOOGLE_API_KEY=your_gemini_api_key_here
DB_HOST=localhost
DB_PORT=5432
DB_NAME=rag_pipeline_db
DB_USER=postgres
DB_PASSWORD=your_password
Working Code Example
Let's build our RAG pipeline by combining everything we've learned, using PostgreSQL for vector storage:
Step 1: Database Setup
First, create the database schema in pgAdmin or psql:
-- Create the pgvector extension
CREATE EXTENSION IF NOT EXISTS vector;
-- Create document_chunks table for our RAG pipeline
CREATE TABLE document_chunks (
id SERIAL PRIMARY KEY,
chunk_id VARCHAR(255) UNIQUE NOT NULL,
content TEXT NOT NULL,
embedding vector(768), -- Google's text-embedding-004 uses 768 dimensions
metadata JSONB NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Create an index for faster similarity searches
CREATE INDEX ON document_chunks USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);
-- Create an index on chunk_id for faster lookups
CREATE INDEX ON document_chunks (chunk_id);
This SQL creates our vector-enabled table specifically designed for RAG pipeline chunks.
Step 2: Set Up Imports and Interfaces
// index.ts
import { GoogleGenerativeAI } from '@google/generative-ai';
import { Client } from 'pg';
import * as dotenv from 'dotenv';
dotenv.config();
interface DocumentChunk {
id: number;
chunk_id: string;
content: string;
embedding: number[];
metadata: {
source: string;
chunkIndex: number;
timestamp: Date;
};
}
interface SearchResult {
chunk: DocumentChunk;
similarity: number;
}
This defines the structure for our document chunks and search results, building on what we learned about PostgreSQL vector storage.
Step 3: Create the PostgreSQL RAG Pipeline Class
// index.ts
class PostgreSQLRAGPipeline {
private client: Client;
private genAI: GoogleGenerativeAI;
private embeddingModel: any;
private generativeModel: any;
constructor() {
this.client = new Client({
host: process.env.DB_HOST || 'localhost',
port: parseInt(process.env.DB_PORT || '5432'),
database: process.env.DB_NAME || 'rag_pipeline_db',
user: process.env.DB_USER || 'postgres',
password: process.env.DB_PASSWORD || 'password',
});
this.genAI = new GoogleGenerativeAI(process.env.GOOGLE_API_KEY!);
this.embeddingModel = this.genAI.getGenerativeModel({
model: "text-embedding-004"
});
this.generativeModel = this.genAI.getGenerativeModel({
model: "gemini-2.5-flash"
});
}
async connect() {
await this.client.connect();
console.log('Connected to PostgreSQL database');
}
async disconnect() {
await this.client.end();
console.log('Disconnected from PostgreSQL database');
}
}
This initializes our RAG pipeline with PostgreSQL connection and both embedding and generative models.
Step 4: Implement Text Chunking
// index.ts (add to PostgreSQLRAGPipeline class)
private splitIntoChunks(text: string, chunkSize: number = 500, overlap: number = 50): string[] {
const chunks: string[] = [];
let start = 0;
while (start < text.length) {
let end = start + chunkSize;
// If we're not at the end, try to break at a sentence boundary
if (end < text.length) {
const lastPeriod = text.lastIndexOf('. ', end);
const lastNewline = text.lastIndexOf('\n', end);
const breakPoint = Math.max(lastPeriod, lastNewline);
if (breakPoint > start + chunkSize * 0.5) {
end = breakPoint + 1;
}
}
chunks.push(text.slice(start, end).trim());
start = end - overlap; // Create overlap between chunks
}
return chunks.filter(chunk => chunk.length > 0);
}
This implements intelligent text chunking that tries to break at natural boundaries while maintaining some overlap for context.
Step 5: Document Ingestion Pipeline with PostgreSQL
// index.ts (add to PostgreSQLRAGPipeline class)
async ingestDocument(content: string, source: string): Promise<void> {
console.log(`Ingesting document: ${source}`);
try {
// Step 1: Split document into chunks
const textChunks = this.splitIntoChunks(content);
console.log(`Created ${textChunks.length} chunks`);
// Step 2: Create embeddings and store in PostgreSQL
for (let i = 0; i < textChunks.length; i++) {
const chunk = textChunks[i];
// Create embedding for the chunk
const result = await this.embeddingModel.embedContent(chunk);
const embedding = result.embedding.values;
// Prepare chunk data
const chunkId = `${source}_chunk_${i}`;
const metadata = {
source: source,
chunkIndex: i,
timestamp: new Date()
};
// Insert into PostgreSQL
const query = `
INSERT INTO document_chunks (chunk_id, content, embedding, metadata)
VALUES ($1, $2, $3, $4)
ON CONFLICT (chunk_id) DO UPDATE SET
content = EXCLUDED.content,
embedding = EXCLUDED.embedding,
metadata = EXCLUDED.metadata
`;
const values = [
chunkId,
chunk,
`[${embedding.join(',')}]`, // Convert array to PostgreSQL vector format
JSON.stringify(metadata)
];
await this.client.query(query, values);
console.log(`Processed chunk ${i + 1}/${textChunks.length}`);
}
console.log(`Successfully ingested ${source}`);
} catch (error) {
console.error(`Error ingesting document ${source}:`, error);
throw error;
}
}
This implements the complete ingestion pipeline using PostgreSQL: chunking → embedding → SQL storage.
Step 6: PostgreSQL Similarity Search Implementation
// index.ts (add to PostgreSQLRAGPipeline class)
async searchSimilarChunks(query: string, topK: number = 3): Promise<SearchResult[]> {
try {
// Create embedding for the query
const result = await this.embeddingModel.embedContent(query);
const queryEmbedding = result.embedding.values;
// Execute similarity search using PostgreSQL with pgvector
const searchQuery = `
SELECT
id,
chunk_id,
content,
metadata,
created_at,
1 - (embedding <=> $1) AS similarity
FROM document_chunks
ORDER BY embedding <=> $1
LIMIT $2
`;
const queryVector = `[${queryEmbedding.join(',')}]`;
const res = await this.client.query(searchQuery, [queryVector, topK]);
return res.rows.map(row => ({
chunk: {
id: row.id,
chunk_id: row.chunk_id,
content: row.content,
embedding: [], // We don't need to return the full embedding
metadata: row.metadata
},
similarity: parseFloat(row.similarity)
}));
} catch (error) {
console.error('Error searching similar chunks:', error);
throw error;
}
}
This implements the search functionality using PostgreSQL's native vector operations with the <=> cosine distance operator from tutorial 4.3.
Step 7: Answer Generation Pipeline
// index.ts (add to PostgreSQLRAGPipeline class)
async generateAnswer(query: string, maxContext: number = 3): Promise<string> {
console.log(`Processing query: "${query}"`);
try {
// Step 1: Search for relevant chunks using PostgreSQL
const searchResults = await this.searchSimilarChunks(query, maxContext);
if (searchResults.length === 0) {
return "I don't have enough information to answer that question.";
}
// Step 2: Prepare context from retrieved chunks
const context = searchResults
.map((result, index) =>
`Context ${index + 1} (similarity: ${result.similarity.toFixed(3)}):\n${result.chunk.content}`
)
.join('\n\n---\n\n');
// Step 3: Create prompt for the LLM
const prompt = `You are a helpful assistant. Answer the user's question based on the provided context.
Context:
${context}
Question: ${query}
Instructions:
- Use only the information provided in the context
- If the context doesn't contain enough information, say so
- Be specific and cite relevant parts of the context
- Keep your answer concise but complete
Answer:`;
// Step 4: Generate answer using the LLM
const result = await this.generativeModel.generateContent(prompt);
return result.response.text();
} catch (error) {
console.error('Error generating answer:', error);
return "I encountered an error while generating the answer.";
}
}
This implements the complete query pipeline: PostgreSQL search → retrieve → generate.
Step 8: Pipeline Demo and Testing
// index.ts
async function demonstrateRAGPipeline() {
const rag = new PostgreSQLRAGPipeline();
try {
await rag.connect();
// Sample documents to ingest
const documents = [
{
source: "typescript_basics.md",
content: `TypeScript is a programming language developed by Microsoft. It adds static type definitions to JavaScript. TypeScript code is compiled to JavaScript, which means it can run anywhere JavaScript runs. The main benefits include better IDE support, early error detection, and improved code maintainability. To get started with TypeScript, you need to install it using npm install -g typescript.`
},
{
source: "async_await.md",
content: `Async/await in TypeScript provides a clean way to handle asynchronous operations. Use async before function declarations and await before asynchronous calls. Error handling with async/await is done using try-catch blocks. For example: async function fetchData() { try { const result = await fetch('/api/data'); return await result.json(); } catch (error) { console.error('Error:', error); } }`
},
{
source: "debugging_tips.md",
content: `Debugging TypeScript applications involves several techniques. Use the debugger statement for breakpoints. Console.log is helpful for quick debugging. TypeScript compiler provides excellent error messages. Use source maps for debugging compiled code. VS Code has excellent TypeScript debugging support with built-in debugging tools.`
}
];
// Ingest all documents
console.log('=== Starting Document Ingestion ===\n');
for (const doc of documents) {
await rag.ingestDocument(doc.content, doc.source);
console.log('---');
}
// Test queries
const queries = [
"How do I handle errors in async/await?",
"What is TypeScript?",
"How can I debug TypeScript code?",
"What are the benefits of using TypeScript?"
];
console.log('\n=== Testing RAG Pipeline ===\n');
for (const query of queries) {
console.log(`🤔 Question: ${query}`);
const answer = await rag.generateAnswer(query);
console.log(`🤖 Answer: ${answer}\n`);
console.log('---\n');
}
} finally {
await rag.disconnect();
}
}
This demonstrates the complete pipeline in action with sample documents and queries using PostgreSQL.
Step 9: SQL Queries for pgAdmin Debugging
Here are useful SQL queries you can run directly in pgAdmin to debug your RAG pipeline:
-- View all ingested document chunks
SELECT
chunk_id,
LEFT(content, 100) as content_preview,
metadata->>'source' as source,
metadata->>'chunkIndex' as chunk_index,
created_at
FROM document_chunks
ORDER BY created_at DESC;
-- Test similarity search manually (replace with actual query embedding)
SELECT
chunk_id,
LEFT(content, 150) as content_preview,
metadata->>'source' as source,
1 - (embedding <=> '[0.1, -0.2, 0.3, ...]') AS similarity
FROM document_chunks
ORDER BY similarity DESC
LIMIT 5;
-- Get statistics about your RAG pipeline data
SELECT
COUNT(*) as total_chunks,
COUNT(DISTINCT metadata->>'source') as unique_sources,
AVG(LENGTH(content)) as avg_chunk_length,
MIN(created_at) as first_ingestion,
MAX(created_at) as last_ingestion
FROM document_chunks;
-- Find chunks from a specific source
SELECT
chunk_id,
content,
metadata->>'chunkIndex' as chunk_index
FROM document_chunks
WHERE metadata->>'source' = 'typescript_basics.md'
ORDER BY (metadata->>'chunkIndex')::int;
Step 10: Run the Complete Pipeline
// index.ts
async function main() {
try {
await demonstrateRAGPipeline();
} catch (error) {
console.error('Pipeline error:', error);
}
}
main().catch(console.error);
This ties everything together into a runnable demonstration using PostgreSQL for vector storage.
Testing & Debugging
Building a robust RAG pipeline requires thorough testing and debugging. Here are the key areas to focus on:
1. Testing Document Ingestion
// Add this method to PostgreSQLRAGPipeline class
async debugIngestion(content: string, source: string) {
console.log('=== Ingestion Debug ===');
console.log(`Document length: ${content.length} characters`);
const chunks = this.splitIntoChunks(content);
console.log(`Number of chunks: ${chunks.length}`);
chunks.forEach((chunk, index) => {
console.log(`Chunk ${index}: ${chunk.length} chars`);
console.log(`Preview: ${chunk.substring(0, 100)}...`);
});
// Check what's actually stored in PostgreSQL
const query = `
SELECT chunk_id, LEFT(content, 100) as preview, metadata
FROM document_chunks
WHERE metadata->>'source' = $1
ORDER BY (metadata->>'chunkIndex')::int
`;
const result = await this.client.query(query, [source]);
console.log('Stored chunks:', result.rows);
}
Common Issues:
- Chunks too small: Information gets fragmented
- Chunks too large: Exceeds embedding model token limits
- Poor chunking boundaries: Context gets split unnaturally
- Database connection issues: Check PostgreSQL connection and pgvector extension
2. Testing PostgreSQL Similarity Search
// Add this method to PostgreSQLRAGPipeline class
async debugSearch(query: string) {
console.log('=== Search Debug ===');
console.log(`Query: "${query}"`);
const results = await this.searchSimilarChunks(query, 5);
results.forEach((result, index) => {
console.log(`\nResult ${index + 1}:`);
console.log(`Similarity: ${result.similarity.toFixed(4)}`);
console.log(`Source: ${result.chunk.metadata.source}`);
console.log(`Chunk ID: ${result.chunk.chunk_id}`);
console.log(`Text: ${result.chunk.content.substring(0, 150)}...`);
});
// Also show the raw SQL query results
const embeddingResult = await this.embeddingModel.embedContent(query);
const queryVector = `[${embeddingResult.embedding.values.join(',')}]`;
const sqlQuery = `
SELECT chunk_id, 1 - (embedding <=> $1) AS similarity
FROM document_chunks
ORDER BY embedding <=> $1
LIMIT 3
`;
const sqlResult = await this.client.query(sqlQuery, [queryVector]);
console.log('\nDirect SQL results:', sqlResult.rows);
}
What to Look For:
- Similarity scores: Should be > 0.7 for relevant content
- Ranking order: Most relevant chunks should rank highest
- SQL performance: Queries should execute quickly with proper indexing
- Coverage: Important information shouldn't be missed
3. Testing Answer Quality
// Add this method to PostgreSQLRAGPipeline class
async debugAnswerGeneration(query: string) {
console.log('=== Answer Generation Debug ===');
const searchResults = await this.searchSimilarChunks(query, 3);
console.log('Retrieved contexts:');
searchResults.forEach((result, index) => {
console.log(`\nContext ${index + 1} (${result.similarity.toFixed(3)})`);
console.log(`Source: ${result.chunk.metadata.source}`);
console.log(`Chunk ID: ${result.chunk.chunk_id}`);
console.log(`Text: ${result.chunk.content}`);
});
const answer = await this.generateAnswer(query);
console.log(`\nGenerated Answer:\n${answer}`);
}
Quality Indicators:
- Relevance: Answer addresses the question
- Accuracy: Information is correct
- Completeness: Important details aren't missing
- Context usage: Answer references retrieved information
4. Common Pipeline Issues and Solutions
Issue: PostgreSQL connection errors
// Solution: Add connection retry logic
async connectWithRetry(maxRetries: number = 3) {
for (let i = 0; i < maxRetries; i++) {
try {
await this.client.connect();
console.log('Connected to PostgreSQL database');
return;
} catch (error) {
console.log(`Connection attempt ${i + 1} failed:`, error);
if (i === maxRetries - 1) throw error;
await new Promise(resolve => setTimeout(resolve, 1000));
}
}
}
Issue: No relevant results found
// Solution: Check similarity thresholds and improve chunking
async searchWithThreshold(query: string, minSimilarity: number = 0.5) {
const results = await this.searchSimilarChunks(query, 10);
const filtered = results.filter(r => r.similarity >= minSimilarity);
if (filtered.length === 0) {
console.log('Warning: No results above similarity threshold');
console.log('Top result similarity:', results[0]?.similarity);
}
return filtered.slice(0, 3);
}
Issue: Slow PostgreSQL queries
// Solution: Monitor and optimize queries
async measureSearchPerformance(query: string) {
const startTime = Date.now();
const results = await this.searchSimilarChunks(query);
const endTime = Date.now();
console.log(`Search took ${endTime - startTime}ms`);
console.log(`Searched through ${await this.getChunkCount()} chunks`);
return results;
}
async getChunkCount(): Promise<number> {
const result = await this.client.query('SELECT COUNT(*) FROM document_chunks');
return parseInt(result.rows[0].count);
}
5. Performance Monitoring
// Add this method to PostgreSQLRAGPipeline class
async measurePerformance(query: string) {
const startTime = Date.now();
const searchStart = Date.now();
const results = await this.searchSimilarChunks(query);
const searchTime = Date.now() - searchStart;
const generateStart = Date.now();
const answer = await this.generateAnswer(query);
const generateTime = Date.now() - generateStart;
const totalTime = Date.now() - startTime;
const chunkCount = await this.getChunkCount();
console.log('=== Performance Metrics ===');
console.log(`Search time: ${searchTime}ms`);
console.log(`Generation time: ${generateTime}ms`);
console.log(`Total time: ${totalTime}ms`);
console.log(`Chunks in database: ${chunkCount}`);
return { searchTime, generateTime, totalTime };
}
FAQ
Summary
Congratulations! You've built a complete production-ready RAG pipeline using PostgreSQL that combines everything from the previous tutorials:
- Document Ingestion: Smart chunking and embedding creation from tutorial 4.2
- PostgreSQL Vector Storage: Efficient similarity search using SQL from tutorial 4.3
- Query Processing: Semantic search and context retrieval using pgvector
- Answer Generation: LLM-powered responses using retrieved context
Key achievements in this tutorial:
- Built a complete RAG pipeline that ingests documents and answers questions using PostgreSQL
- Implemented intelligent text chunking with overlap for better context preservation
- Combined embedding creation, PostgreSQL vector storage, and SQL similarity search into one system
- Created a query pipeline that retrieves relevant context using SQL and generates accurate answers
- Added debugging tools to troubleshoot and optimize pipeline performance using pgAdmin
Your RAG pipeline can now process any text documents, store them persistently in PostgreSQL, and answer questions about their content using production-ready vector search. In the next tutorial, we'll take this foundation and build a practical CLI application that users can interact with directly.
Complete Code
You can find the complete, runnable code for this tutorial on GitHub: https://github.com/avestalabs/academy/tree/main/4-rag/building-rag-pipeline