Data Preprocessing: Preparing Data for Machine Learning
Data preprocessing is a critical step in the machine learning pipeline that transforms raw data into a format suitable for training algorithms. This comprehensive guide covers essential preprocessing techniques with practical implementations.
Introduction to Data Preprocessing
Data preprocessing involves cleaning, transforming, and organizing raw data to improve the quality and effectiveness of machine learning models.
Why Data Preprocessing Matters
- Improves Model Performance: Clean, well-formatted data leads to better predictions
- Reduces Training Time: Properly scaled data converges faster during training
- Prevents Overfitting: Removing noise and outliers improves generalization
- Enables Algorithm Compatibility: Different algorithms require different data formats
Data Cleaning Techniques
Handling Missing Values
import pandas as pd
import numpy as np
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
import matplotlib.pyplot as plt
import seaborn as sns
class MissingValueHandler:
def __init__(self):
self.imputers = {}
self.missing_patterns = {}
def analyze_missing_data(self, df):
"""Analyze missing data patterns"""
missing_info = {
'missing_counts': df.isnull().sum(),
'missing_percentages': (df.isnull().sum() / len(df)) * 100,
'missing_patterns': df.isnull().sum(axis=1).value_counts().sort_index()
}
# Visualize missing data
plt.figure(figsize=(12, 8))
# Missing data heatmap
plt.subplot(2, 2, 1)
sns.heatmap(df.isnull(), cbar=True, yticklabels=False, cmap='viridis')
plt.title('Missing Data Heatmap')
# Missing data bar plot
plt.subplot(2, 2, 2)
missing_counts = df.isnull().sum()
missing_counts[missing_counts > 0].plot(kind='bar')
plt.title('Missing Values by Column')
plt.xticks(rotation=45)
# Missing data correlation
plt.subplot(2, 2, 3)
missing_corr = df.isnull().corr()
sns.heatmap(missing_corr, annot=True, cmap='coolwarm', center=0)
plt.title('Missing Data Correlation')
plt.tight_layout()
plt.show()
return missing_info
def simple_imputation(self, df, strategy='mean', columns=None):
"""Simple imputation strategies"""
if columns is None:
columns = df.select_dtypes(include=[np.number]).columns
df_imputed = df.copy()
for column in columns:
if df[column].isnull().any():
if strategy == 'mean':
imputer = SimpleImputer(strategy='mean')
elif strategy == 'median':
imputer = SimpleImputer(strategy='median')
elif strategy == 'mode':
imputer = SimpleImputer(strategy='most_frequent')
elif strategy == 'constant':
imputer = SimpleImputer(strategy='constant', fill_value=0)
df_imputed[column] = imputer.fit_transform(df[[column]]).ravel()
self.imputers[column] = imputer
return df_imputed
def knn_imputation(self, df, n_neighbors=5, columns=None):
"""KNN-based imputation"""
if columns is None:
columns = df.select_dtypes(include=[np.number]).columns
df_imputed = df.copy()
imputer = KNNImputer(n_neighbors=n_neighbors)
df_imputed[columns] = imputer.fit_transform(df[columns])
self.imputers['knn'] = imputer
return df_imputed
def iterative_imputation(self, df, max_iter=10, columns=None):
"""Iterative imputation using regression"""
if columns is None:
columns = df.select_dtypes(include=[np.number]).columns
df_imputed = df.copy()
imputer = IterativeImputer(max_iter=max_iter, random_state=42)
df_imputed[columns] = imputer.fit_transform(df[columns])
self.imputers['iterative'] = imputer
return df_imputed
def forward_fill_imputation(self, df, columns=None):
"""Forward fill for time series data"""
if columns is None:
columns = df.columns
df_imputed = df.copy()
df_imputed[columns] = df_imputed[columns].fillna(method='ffill')
return df_imputed
def interpolation_imputation(self, df, method='linear', columns=None):
"""Interpolation for time series data"""
if columns is None:
columns = df.select_dtypes(include=[np.number]).columns
df_imputed = df.copy()
for column in columns:
df_imputed[column] = df_imputed[column].interpolate(method=method)
return df_imputed
# Example usage
def demonstrate_missing_value_handling():
# Create sample data with missing values
np.random.seed(42)
data = {
'feature1': np.random.randn(1000),
'feature2': np.random.randn(1000),
'feature3': np.random.randn(1000),
'category': np.random.choice(['A', 'B', 'C'], 1000)
}
df = pd.DataFrame(data)
# Introduce missing values
missing_indices = np.random.choice(df.index, size=200, replace=False)
df.loc[missing_indices[:100], 'feature1'] = np.nan
df.loc[missing_indices[100:150], 'feature2'] = np.nan
df.loc[missing_indices[150:], 'feature3'] = np.nan
handler = MissingValueHandler()
# Analyze missing data
missing_info = handler.analyze_missing_data(df)
print("Missing Value Analysis:")
print(missing_info['missing_counts'])
# Try different imputation methods
df_mean = handler.simple_imputation(df, strategy='mean')
df_knn = handler.knn_imputation(df, n_neighbors=5)
df_iterative = handler.iterative_imputation(df)
print(f"\nOriginal missing values: {df.isnull().sum().sum()}")
print(f"After mean imputation: {df_mean.isnull().sum().sum()}")
print(f"After KNN imputation: {df_knn.isnull().sum().sum()}")
print(f"After iterative imputation: {df_iterative.isnull().sum().sum()}")
demonstrate_missing_value_handling()
Outlier Detection and Treatment
from scipy import stats
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.covariance import EllipticEnvelope
class OutlierDetector:
def __init__(self):
self.detectors = {}
self.outlier_indices = {}
def statistical_outliers(self, df, columns=None, method='iqr', threshold=1.5):
"""Detect outliers using statistical methods"""
if columns is None:
columns = df.select_dtypes(include=[np.number]).columns
outlier_indices = set()
for column in columns:
if method == 'iqr':
Q1 = df[column].quantile(0.25)
Q3 = df[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - threshold * IQR
upper_bound = Q3 + threshold * IQR
column_outliers = df[(df[column] < lower_bound) |
(df[column] > upper_bound)].index
elif method == 'zscore':
z_scores = np.abs(stats.zscore(df[column].dropna()))
column_outliers = df[z_scores > threshold].index
elif method == 'modified_zscore':
median = df[column].median()
mad = np.median(np.abs(df[column] - median))
modified_z_scores = 0.6745 * (df[column] - median) / mad
column_outliers = df[np.abs(modified_z_scores) > threshold].index
outlier_indices.update(column_outliers)
self.outlier_indices['statistical'] = list(outlier_indices)
return list(outlier_indices)
def isolation_forest_outliers(self, df, contamination=0.1, columns=None):
"""Detect outliers using Isolation Forest"""
if columns is None:
columns = df.select_dtypes(include=[np.number]).columns
iso_forest = IsolationForest(contamination=contamination, random_state=42)
outlier_labels = iso_forest.fit_predict(df[columns])
outlier_indices = df[outlier_labels == -1].index.tolist()
self.detectors['isolation_forest'] = iso_forest
self.outlier_indices['isolation_forest'] = outlier_indices
return outlier_indices
def local_outlier_factor(self, df, n_neighbors=20, columns=None):
"""Detect outliers using Local Outlier Factor"""
if columns is None:
columns = df.select_dtypes(include=[np.number]).columns
lof = LocalOutlierFactor(n_neighbors=n_neighbors)
outlier_labels = lof.fit_predict(df[columns])
outlier_indices = df[outlier_labels == -1].index.tolist()
self.outlier_indices['lof'] = outlier_indices
return outlier_indices
def elliptic_envelope_outliers(self, df, contamination=0.1, columns=None):
"""Detect outliers using Elliptic Envelope"""
if columns is None:
columns = df.select_dtypes(include=[np.number]).columns
ee = EllipticEnvelope(contamination=contamination, random_state=42)
outlier_labels = ee.fit_predict(df[columns])
outlier_indices = df[outlier_labels == -1].index.tolist()
self.detectors['elliptic_envelope'] = ee
self.outlier_indices['elliptic_envelope'] = outlier_indices
return outlier_indices
def treat_outliers(self, df, outlier_indices, method='remove'):
"""Treat detected outliers"""
df_treated = df.copy()
if method == 'remove':
df_treated = df_treated.drop(outlier_indices)
elif method == 'cap':
numeric_columns = df.select_dtypes(include=[np.number]).columns
for column in numeric_columns:
Q1 = df[column].quantile(0.25)
Q3 = df[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
df_treated[column] = df_treated[column].clip(lower=lower_bound,
upper=upper_bound)
elif method == 'transform':
# Log transformation for positive values
numeric_columns = df.select_dtypes(include=[np.number]).columns
for column in numeric_columns:
if (df[column] > 0).all():
df_treated[column] = np.log1p(df[column])
return df_treated
def visualize_outliers(self, df, columns=None):
"""Visualize outliers in the data"""
if columns is None:
columns = df.select_dtypes(include=[np.number]).columns[:4] # Limit to 4 columns
fig, axes = plt.subplots(2, len(columns), figsize=(4*len(columns), 8))
for i, column in enumerate(columns):
# Box plot
axes[0, i].boxplot(df[column].dropna())
axes[0, i].set_title(f'{column} - Box Plot')
# Histogram
axes[1, i].hist(df[column].dropna(), bins=30, alpha=0.7)
axes[1, i].set_title(f'{column} - Histogram')
plt.tight_layout()
plt.show()
# Example usage
def demonstrate_outlier_detection():
# Create sample data with outliers
np.random.seed(42)
normal_data = np.random.randn(950, 3)
outlier_data = np.random.randn(50, 3) * 5 + 10 # Outliers
data = np.vstack([normal_data, outlier_data])
df = pd.DataFrame(data, columns=['feature1', 'feature2', 'feature3'])
detector = OutlierDetector()
# Visualize data
detector.visualize_outliers(df)
# Detect outliers using different methods
iqr_outliers = detector.statistical_outliers(df, method='iqr')
iso_outliers = detector.isolation_forest_outliers(df, contamination=0.05)
lof_outliers = detector.local_outlier_factor(df)
print(f"IQR outliers detected: {len(iqr_outliers)}")
print(f"Isolation Forest outliers detected: {len(iso_outliers)}")
print(f"LOF outliers detected: {len(lof_outliers)}")
# Treat outliers
df_no_outliers = detector.treat_outliers(df, iqr_outliers, method='remove')
df_capped = detector.treat_outliers(df, iqr_outliers, method='cap')
print(f"\nOriginal data shape: {df.shape}")
print(f"After removing outliers: {df_no_outliers.shape}")
print(f"After capping outliers: {df_capped.shape}")
demonstrate_outlier_detection()
Feature Scaling and Normalization
Scaling Techniques
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler, PowerTransformer
from sklearn.preprocessing import QuantileTransformer, Normalizer
class FeatureScaler:
def __init__(self):
self.scalers = {}
self.scaling_methods = {
'standard': StandardScaler(),
'minmax': MinMaxScaler(),
'robust': RobustScaler(),
'power': PowerTransformer(method='yeo-johnson'),
'quantile_uniform': QuantileTransformer(output_distribution='uniform'),
'quantile_normal': QuantileTransformer(output_distribution='normal'),
'normalizer': Normalizer()
}
def fit_transform_scaling(self, df, method='standard', columns=None):
"""Fit and transform data using specified scaling method"""
if columns is None:
columns = df.select_dtypes(include=[np.number]).columns
df_scaled = df.copy()
scaler = self.scaling_methods[method]
df_scaled[columns] = scaler.fit_transform(df[columns])
self.scalers[method] = scaler
return df_scaled
def compare_scaling_methods(self, df, columns=None):
"""Compare different scaling methods"""
if columns is None:
columns = df.select_dtypes(include=[np.number]).columns[:2] # Limit for visualization
fig, axes = plt.subplots(2, 4, figsize=(16, 8))
axes = axes.ravel()
# Original data
axes[0].scatter(df[columns[0]], df[columns[1]], alpha=0.6)
axes[0].set_title('Original Data')
axes[0].set_xlabel(columns[0])
axes[0].set_ylabel(columns[1])
# Different scaling methods
methods = ['standard', 'minmax', 'robust', 'power', 'quantile_uniform', 'quantile_normal']
for i, method in enumerate(methods, 1):
df_scaled = self.fit_transform_scaling(df, method=method, columns=columns)
axes[i].scatter(df_scaled[columns[0]], df_scaled[columns[1]], alpha=0.6)
axes[i].set_title(f'{method.title()} Scaling')
axes[i].set_xlabel(f'{columns[0]} (scaled)')
axes[i].set_ylabel(f'{columns[1]} (scaled)')
plt.tight_layout()
plt.show()
# Statistical comparison
comparison_stats = {}
for method in methods:
df_scaled = self.fit_transform_scaling(df, method=method, columns=columns)
comparison_stats[method] = {
'mean': df_scaled[columns].mean().mean(),
'std': df_scaled[columns].std().mean(),
'min': df_scaled[columns].min().min(),
'max': df_scaled[columns].max().max()
}
comparison_df = pd.DataFrame(comparison_stats).T
print("Scaling Methods Comparison:")
print(comparison_df.round(3))
return comparison_df
def transform_new_data(self, df, method='standard', columns=None):
"""Transform new data using fitted scaler"""
if method not in self.scalers:
raise ValueError(f"Scaler for method '{method}' not fitted yet")
if columns is None:
columns = df.select_dtypes(include=[np.number]).columns
df_scaled = df.copy()
df_scaled[columns] = self.scalers[method].transform(df[columns])
return df_scaled
# Example usage
def demonstrate_feature_scaling():
# Create sample data with different scales
np.random.seed(42)
data = {
'small_values': np.random.randn(1000) * 0.1,
'medium_values': np.random.randn(1000) * 10 + 50,
'large_values': np.random.randn(1000) * 1000 + 5000,
'skewed_values': np.random.exponential(2, 1000)
}
df = pd.DataFrame(data)
scaler = FeatureScaler()
print("Original Data Statistics:")
print(df.describe())
# Compare scaling methods
comparison_stats = scaler.compare_scaling_methods(df)
# Demonstrate specific scaling
df_standard = scaler.fit_transform_scaling(df, method='standard')
print(f"\nAfter Standard Scaling:")
print(df_standard.describe())
demonstrate_feature_scaling()
Categorical Data Encoding
Encoding Techniques
from sklearn.preprocessing import LabelEncoder, OneHotEncoder, OrdinalEncoder
from sklearn.preprocessing import TargetEncoder
import category_encoders as ce
class CategoricalEncoder:
def __init__(self):
self.encoders = {}
self.encoding_mappings = {}
def label_encoding(self, df, columns=None):
"""Label encoding for categorical variables"""
if columns is None:
columns = df.select_dtypes(include=['object']).columns
df_encoded = df.copy()
for column in columns:
le = LabelEncoder()
df_encoded[column] = le.fit_transform(df[column].astype(str))
self.encoders[f'{column}_label'] = le
# Store mapping for interpretation
self.encoding_mappings[column] = dict(zip(le.classes_, le.transform(le.classes_)))
return df_encoded
def one_hot_encoding(self, df, columns=None, drop_first=False):
"""One-hot encoding for categorical variables"""
if columns is None:
columns = df.select_dtypes(include=['object']).columns
df_encoded = df.copy()
for column in columns:
# Create dummy variables
dummies = pd.get_dummies(df[column], prefix=column, drop_first=drop_first)
# Drop original column and add dummy columns
df_encoded = df_encoded.drop(column, axis=1)
df_encoded = pd.concat([df_encoded, dummies], axis=1)
return df_encoded
def ordinal_encoding(self, df, columns=None, categories=None):
"""Ordinal encoding with custom ordering"""
if columns is None:
columns = df.select_dtypes(include=['object']).columns
df_encoded = df.copy()
for column in columns:
if categories and column in categories:
# Use custom ordering
oe = OrdinalEncoder(categories=[categories[column]])
else:
# Use automatic ordering
oe = OrdinalEncoder()
df_encoded[column] = oe.fit_transform(df[[column]]).ravel()
self.encoders[f'{column}_ordinal'] = oe
return df_encoded
def target_encoding(self, df, target_column, categorical_columns=None):
"""Target encoding (mean encoding)"""
if categorical_columns is None:
categorical_columns = df.select_dtypes(include=['object']).columns
df_encoded = df.copy()
for column in categorical_columns:
if column != target_column:
# Calculate mean target value for each category
target_means = df.groupby(column)[target_column].mean()
# Map categories to their target means
df_encoded[f'{column}_target_encoded'] = df[column].map(target_means)
# Store encoding for new data
self.encoding_mappings[f'{column}_target'] = target_means.to_dict()
return df_encoded
def frequency_encoding(self, df, columns=None):
"""Frequency encoding"""
if columns is None:
columns = df.select_dtypes(include=['object']).columns
df_encoded = df.copy()
for column in columns:
# Calculate frequency of each category
freq_map = df[column].value_counts().to_dict()
# Replace categories with their frequencies
df_encoded[f'{column}_freq'] = df[column].map(freq_map)
# Store mapping
self.encoding_mappings[f'{column}_freq'] = freq_map
return df_encoded
def binary_encoding(self, df, columns=None):
"""Binary encoding for high cardinality categorical variables"""
if columns is None:
columns = df.select_dtypes(include=['object']).columns
df_encoded = df.copy()
for column in columns:
be = ce.BinaryEncoder(cols=[column])
binary_encoded = be.fit_transform(df[[column]])
# Drop original column and add binary encoded columns
df_encoded = df_encoded.drop(column, axis=1)
df_encoded = pd.concat([df_encoded, binary_encoded], axis=1)
self.encoders[f'{column}_binary'] = be
return df_encoded
def hash_encoding(self, df, columns=None, n_components=8):
"""Hash encoding for high cardinality categorical variables"""
if columns is None:
columns = df.select_dtypes(include=['object']).columns
df_encoded = df.copy()
for column in columns:
he = ce.HashingEncoder(cols=[column], n_components=n_components)
hash_encoded = he.fit_transform(df[[column]])
# Drop original column and add hash encoded columns
df_encoded = df_encoded.drop(column, axis=1)
df_encoded = pd.concat([df_encoded, hash_encoded], axis=1)
self.encoders[f'{column}_hash'] = he
return df_encoded
def compare_encoding_methods(self, df, target_column=None):
"""Compare different encoding methods"""
categorical_columns = df.select_dtypes(include=['object']).columns
if len(categorical_columns) == 0:
print("No categorical columns found")
return
results = {}
# Original data
results['original'] = {
'shape': df.shape,
'memory_usage': df.memory_usage(deep=True).sum()
}
# Label encoding
df_label = self.label_encoding(df, categorical_columns)
results['label_encoding'] = {
'shape': df_label.shape,
'memory_usage': df_label.memory_usage(deep=True).sum()
}
# One-hot encoding
df_onehot = self.one_hot_encoding(df, categorical_columns)
results['one_hot_encoding'] = {
'shape': df_onehot.shape,
'memory_usage': df_onehot.memory_usage(deep=True).sum()
}
# Binary encoding
df_binary = self.binary_encoding(df, categorical_columns)
results['binary_encoding'] = {
'shape': df_binary.shape,
'memory_usage': df_binary.memory_usage(deep=True).sum()
}
# Frequency encoding
df_freq = self.frequency_encoding(df, categorical_columns)
results['frequency_encoding'] = {
'shape': df_freq.shape,
'memory_usage': df_freq.memory_usage(deep=True).sum()
}
# Target encoding (if target column provided)
if target_column and target_column in df.columns:
df_target = self.target_encoding(df, target_column, categorical_columns)
results['target_encoding'] = {
'shape': df_target.shape,
'memory_usage': df_target.memory_usage(deep=True).sum()
}
# Create comparison DataFrame
comparison_df = pd.DataFrame(results).T
print("Encoding Methods Comparison:")
print(comparison_df)
return comparison_df
# Example usage
def demonstrate_categorical_encoding():
# Create sample data with categorical variables
np.random.seed(42)
data = {
'numeric_feature': np.random.randn(1000),
'low_cardinality': np.random.choice(['A', 'B', 'C'], 1000),
'medium_cardinality': np.random.choice([f'Cat_{i}' for i in range(10)], 1000),
'high_cardinality': np.random.choice([f'Item_{i}' for i in range(100)], 1000),
'ordinal_feature': np.random.choice(['Low', 'Medium', 'High'], 1000),
'target': np.random.randint(0, 2, 1000)
}
df = pd.DataFrame(data)
encoder = CategoricalEncoder()
print("Original Data Info:")
print(df.info())
print(f"\nUnique values per categorical column:")
for col in df.select_dtypes(include=['object']).columns:
print(f"{col}: {df[col].nunique()} unique values")
# Compare encoding methods
comparison = encoder.compare_encoding_methods(df, target_column='target')
# Demonstrate specific encodings
print(f"\nLabel Encoding Example:")
df_label = encoder.label_encoding(df, ['low_cardinality'])
print(f"Mapping: {encoder.encoding_mappings['low_cardinality']}")
print(f"\nTarget Encoding Example:")
df_target = encoder.target_encoding(df, 'target', ['low_cardinality'])
print(f"Target means: {encoder.encoding_mappings['low_cardinality_target']}")
demonstrate_categorical_encoding()
Text Preprocessing
Text Cleaning and Normalization
import re
import string
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize, sent_tokenize
from nltk.stem import PorterStemmer, WordNetLemmatizer
from nltk.chunk import ne_chunk
from nltk.tag import pos_tag
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from textblob import TextBlob
import spacy
# Download required NLTK data
nltk.download('punkt', quiet=True)
nltk.download('stopwords', quiet=True)
nltk.download('wordnet', quiet=True)
nltk.download('averaged_perceptron_tagger', quiet=True)
nltk.download('maxent_ne_chunker', quiet=True)
nltk.download('words', quiet=True)
class TextPreprocessor:
def __init__(self, language='english'):
self.language = language
self.stop_words = set(stopwords.words(language))
self.stemmer = PorterStemmer()
self.lemmatizer = WordNetLemmatizer()
# Try to load spaCy model
try:
self.nlp = spacy.load('en_core_web_sm')
except OSError:
print("spaCy model not found. Install with: python -m spacy download en_core_web_sm")
self.nlp = None
def basic_cleaning(self, text):
"""Basic text cleaning"""
if not isinstance(text, str):
return ""
# Convert to lowercase
text = text.lower()
# Remove HTML tags
text = re.sub(r'<[^>]+>', '', text)
# Remove URLs
text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE)
# Remove email addresses
text = re.sub(r'\S+@\S+', '', text)
# Remove extra whitespace
text = re.sub(r'\s+', ' ', text).strip()
return text
def remove_punctuation(self, text, keep_apostrophes=True):
"""Remove punctuation from text"""
if keep_apostrophes:
# Keep apostrophes for contractions
punctuation = string.punctuation.replace("'", "")
else:
punctuation = string.punctuation
translator = str.maketrans('', '', punctuation)
return text.translate(translator)
def remove_numbers(self, text, replace_with=''):
"""Remove or replace numbers in text"""
return re.sub(r'\d+', replace_with, text)
def expand_contractions(self, text):
"""Expand common English contractions"""
contractions = {
"ain't": "is not", "aren't": "are not", "can't": "cannot",
"couldn't": "could not", "didn't": "did not", "doesn't": "does not",
"don't": "do not", "hadn't": "had not", "hasn't": "has not",
"haven't": "have not", "he'd": "he would", "he'll": "he will",
"he's": "he is", "i'd": "i would", "i'll": "i will",
"i'm": "i am", "i've": "i have", "isn't": "is not",
"it'd": "it would", "it'll": "it will", "it's": "it is",
"let's": "let us", "shouldn't": "should not", "that's": "that is",
"there's": "there is", "they'd": "they would", "they'll": "they will",
"they're": "they are", "they've": "they have", "we'd": "we would",
"we're": "we are", "we've": "we have", "weren't": "were not",
"what's": "what is", "where's": "where is", "who's": "who is",
"won't": "will not", "wouldn't": "would not", "you'd": "you would",
"you'll": "you will", "you're": "you are", "you've": "you have"
}
for contraction, expansion in contractions.items():
text = re.sub(contraction, expansion, text, flags=re.IGNORECASE)
return text
def tokenize(self, text, method='word'):
"""Tokenize text into words or sentences"""
if method == 'word':
return word_tokenize(text)
elif method == 'sentence':
return sent_tokenize(text)
else:
return text.split()
def remove_stopwords(self, tokens, custom_stopwords=None):
"""Remove stopwords from token list"""
stop_words = self.stop_words
if custom_stopwords:
stop_words = stop_words.union(set(custom_stopwords))
return [token for token in tokens if token.lower() not in stop_words]
def stem_tokens(self, tokens):
"""Apply stemming to tokens"""
return [self.stemmer.stem(token) for token in tokens]
def lemmatize_tokens(self, tokens):
"""Apply lemmatization to tokens"""
return [self.lemmatizer.lemmatize(token) for token in tokens]
def pos_tagging(self, tokens):
"""Part-of-speech tagging"""
return pos_tag(tokens)
def named_entity_recognition(self, tokens):
"""Named entity recognition"""
pos_tags = pos_tag(tokens)
return ne_chunk(pos_tags)
def advanced_preprocessing_spacy(self, text):
"""Advanced preprocessing using spaCy"""
if self.nlp is None:
return self.basic_preprocessing(text)
doc = self.nlp(text)
# Extract various linguistic features
processed_tokens = []
entities = []
for token in doc:
# Skip punctuation, spaces, and stop words
if not token.is_punct and not token.is_space and not token.is_stop:
# Use lemmatized form
processed_tokens.append(token.lemma_.lower())
# Extract named entities
for ent in doc.ents:
entities.append((ent.text, ent.label_))
return {
'processed_tokens': processed_tokens,
'entities': entities,
'sentences': [sent.text for sent in doc.sents]
}
def basic_preprocessing(self, text):
"""Complete basic preprocessing pipeline"""
# Basic cleaning
text = self.basic_cleaning(text)
# Expand contractions
text = self.expand_contractions(text)
# Remove punctuation
text = self.remove_punctuation(text)
# Tokenize
tokens = self.tokenize(text)
# Remove stopwords
tokens = self.remove_stopwords(tokens)
# Lemmatize
tokens = self.lemmatize_tokens(tokens)
return tokens
def preprocess_corpus(self, texts, method='basic'):
"""Preprocess a corpus of texts"""
processed_texts = []
for text in texts:
if method == 'basic':
processed = self.basic_preprocessing(text)
processed_texts.append(' '.join(processed))
elif method == 'advanced' and self.nlp:
processed = self.advanced_preprocessing_spacy(text)
processed_texts.append(' '.join(processed['processed_tokens']))
else:
processed_texts.append(self.basic_cleaning(text))
return processed_texts
class TextVectorizer:
def __init__(self):
self.vectorizers = {}
def bag_of_words(self, texts, max_features=1000, ngram_range=(1, 1)):
"""Create bag-of-words representation"""
vectorizer = CountVectorizer(
max_features=max_features,
ngram_range=ngram_range,
lowercase=True,
stop_words='english'
)
bow_matrix = vectorizer.fit_transform(texts)
self.vectorizers['bow'] = vectorizer
return bow_matrix, vectorizer.get_feature_names_out()
def tfidf_vectorization(self, texts, max_features=1000, ngram_range=(1, 1)):
"""Create TF-IDF representation"""
vectorizer = TfidfVectorizer(
max_features=max_features,
ngram_range=ngram_range,
lowercase=True,
stop_words='english'
)
tfidf_matrix = vectorizer.fit_transform(texts)
self.vectorizers['tfidf'] = vectorizer
return tfidf_matrix, vectorizer.get_feature_names_out()
def get_top_features(self, vectorizer_name, n_features=20):
"""Get top features from vectorizer"""
if vectorizer_name not in self.vectorizers:
return None
vectorizer = self.vectorizers[vectorizer_name]
if hasattr(vectorizer, 'idf_'):
# TF-IDF vectorizer
feature_names = vectorizer.get_feature_names_out()
tfidf_scores = vectorizer.idf_
# Get top features by IDF score
top_indices = np.argsort(tfidf_scores)[:n_features]
top_features = [(feature_names[i], tfidf_scores[i]) for i in top_indices]
else:
# Count vectorizer
feature_names = vectorizer.get_feature_names_out()
top_features = list(feature_names[:n_features])
return top_features
# Example usage
def demonstrate_text_preprocessing():
# Sample texts
sample_texts = [
"Hello! This is a sample text with URLs like https://example.com and emails like test@email.com.",
"I can't believe it's already 2024! The weather is great today, isn't it?",
"Machine Learning and Natural Language Processing are fascinating fields in AI.",
"Don't forget to preprocess your text data before training your models!",
"Text preprocessing includes cleaning, tokenization, and normalization steps."
]
preprocessor = TextPreprocessor()
vectorizer = TextVectorizer()
print("Original texts:")
for i, text in enumerate(sample_texts):
print(f"{i+1}. {text}")
# Basic preprocessing
print(f"\nBasic preprocessing:")
processed_texts = preprocessor.preprocess_corpus(sample_texts, method='basic')
for i, text in enumerate(processed_texts):
print(f"{i+1}. {text}")
# Advanced preprocessing (if spaCy is available)
if preprocessor.nlp:
print(f"\nAdvanced preprocessing with spaCy:")
for i, text in enumerate(sample_texts[:2]): # Process first 2 texts
result = preprocessor.advanced_preprocessing_spacy(text)
print(f"{i+1}. Tokens: {result['processed_tokens']}")
print(f" Entities: {result['entities']}")
# Vectorization
print(f"\nText Vectorization:")
# Bag of Words
bow_matrix, bow_features = vectorizer.bag_of_words(processed_texts, max_features=50)
print(f"Bag of Words shape: {bow_matrix.shape}")
print(f"Sample features: {bow_features[:10]}")
# TF-IDF
tfidf_matrix, tfidf_features = vectorizer.tfidf_vectorization(processed_texts, max_features=50)
print(f"TF-IDF shape: {tfidf_matrix.shape}")
print(f"Sample features: {tfidf_features[:10]}")
demonstrate_text_preprocessing()
Conclusion
Data preprocessing is a fundamental skill in machine learning that significantly impacts model performance. This comprehensive guide covers essential techniques for handling missing values, outliers, scaling, categorical encoding, and text preprocessing.
Next Steps
- Practice with real-world datasets
- Experiment with different preprocessing techniques
- Build automated preprocessing pipelines
- Study domain-specific preprocessing requirements
- Learn about data validation and monitoring techniques