Data Preprocessing: Preparing Data for Machine Learning

Data preprocessing is a critical step in the machine learning pipeline that transforms raw data into a format suitable for training algorithms. This comprehensive guide covers essential preprocessing techniques with practical implementations.

Introduction to Data Preprocessing

Data preprocessing involves cleaning, transforming, and organizing raw data to improve the quality and effectiveness of machine learning models.

Why Data Preprocessing Matters

  1. Improves Model Performance: Clean, well-formatted data leads to better predictions
  2. Reduces Training Time: Properly scaled data converges faster during training
  3. Prevents Overfitting: Removing noise and outliers improves generalization
  4. Enables Algorithm Compatibility: Different algorithms require different data formats

Data Cleaning Techniques

Handling Missing Values

import pandas as pd
import numpy as np
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
import matplotlib.pyplot as plt
import seaborn as sns

class MissingValueHandler:
    def __init__(self):
        self.imputers = {}
        self.missing_patterns = {}

    def analyze_missing_data(self, df):
        """Analyze missing data patterns"""
        missing_info = {
            'missing_counts': df.isnull().sum(),
            'missing_percentages': (df.isnull().sum() / len(df)) * 100,
            'missing_patterns': df.isnull().sum(axis=1).value_counts().sort_index()
        }

        # Visualize missing data
        plt.figure(figsize=(12, 8))

        # Missing data heatmap
        plt.subplot(2, 2, 1)
        sns.heatmap(df.isnull(), cbar=True, yticklabels=False, cmap='viridis')
        plt.title('Missing Data Heatmap')

        # Missing data bar plot
        plt.subplot(2, 2, 2)
        missing_counts = df.isnull().sum()
        missing_counts[missing_counts > 0].plot(kind='bar')
        plt.title('Missing Values by Column')
        plt.xticks(rotation=45)

        # Missing data correlation
        plt.subplot(2, 2, 3)
        missing_corr = df.isnull().corr()
        sns.heatmap(missing_corr, annot=True, cmap='coolwarm', center=0)
        plt.title('Missing Data Correlation')

        plt.tight_layout()
        plt.show()

        return missing_info

    def simple_imputation(self, df, strategy='mean', columns=None):
        """Simple imputation strategies"""
        if columns is None:
            columns = df.select_dtypes(include=[np.number]).columns

        df_imputed = df.copy()

        for column in columns:
            if df[column].isnull().any():
                if strategy == 'mean':
                    imputer = SimpleImputer(strategy='mean')
                elif strategy == 'median':
                    imputer = SimpleImputer(strategy='median')
                elif strategy == 'mode':
                    imputer = SimpleImputer(strategy='most_frequent')
                elif strategy == 'constant':
                    imputer = SimpleImputer(strategy='constant', fill_value=0)

                df_imputed[column] = imputer.fit_transform(df[[column]]).ravel()
                self.imputers[column] = imputer

        return df_imputed

    def knn_imputation(self, df, n_neighbors=5, columns=None):
        """KNN-based imputation"""
        if columns is None:
            columns = df.select_dtypes(include=[np.number]).columns

        df_imputed = df.copy()

        imputer = KNNImputer(n_neighbors=n_neighbors)
        df_imputed[columns] = imputer.fit_transform(df[columns])

        self.imputers['knn'] = imputer
        return df_imputed

    def iterative_imputation(self, df, max_iter=10, columns=None):
        """Iterative imputation using regression"""
        if columns is None:
            columns = df.select_dtypes(include=[np.number]).columns

        df_imputed = df.copy()

        imputer = IterativeImputer(max_iter=max_iter, random_state=42)
        df_imputed[columns] = imputer.fit_transform(df[columns])

        self.imputers['iterative'] = imputer
        return df_imputed

    def forward_fill_imputation(self, df, columns=None):
        """Forward fill for time series data"""
        if columns is None:
            columns = df.columns

        df_imputed = df.copy()
        df_imputed[columns] = df_imputed[columns].fillna(method='ffill')

        return df_imputed

    def interpolation_imputation(self, df, method='linear', columns=None):
        """Interpolation for time series data"""
        if columns is None:
            columns = df.select_dtypes(include=[np.number]).columns

        df_imputed = df.copy()

        for column in columns:
            df_imputed[column] = df_imputed[column].interpolate(method=method)

        return df_imputed

# Example usage
def demonstrate_missing_value_handling():
    # Create sample data with missing values
    np.random.seed(42)
    data = {
        'feature1': np.random.randn(1000),
        'feature2': np.random.randn(1000),
        'feature3': np.random.randn(1000),
        'category': np.random.choice(['A', 'B', 'C'], 1000)
    }

    df = pd.DataFrame(data)

    # Introduce missing values
    missing_indices = np.random.choice(df.index, size=200, replace=False)
    df.loc[missing_indices[:100], 'feature1'] = np.nan
    df.loc[missing_indices[100:150], 'feature2'] = np.nan
    df.loc[missing_indices[150:], 'feature3'] = np.nan

    handler = MissingValueHandler()

    # Analyze missing data
    missing_info = handler.analyze_missing_data(df)
    print("Missing Value Analysis:")
    print(missing_info['missing_counts'])

    # Try different imputation methods
    df_mean = handler.simple_imputation(df, strategy='mean')
    df_knn = handler.knn_imputation(df, n_neighbors=5)
    df_iterative = handler.iterative_imputation(df)

    print(f"\nOriginal missing values: {df.isnull().sum().sum()}")
    print(f"After mean imputation: {df_mean.isnull().sum().sum()}")
    print(f"After KNN imputation: {df_knn.isnull().sum().sum()}")
    print(f"After iterative imputation: {df_iterative.isnull().sum().sum()}")

demonstrate_missing_value_handling()

Outlier Detection and Treatment

from scipy import stats
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.covariance import EllipticEnvelope

class OutlierDetector:
    def __init__(self):
        self.detectors = {}
        self.outlier_indices = {}

    def statistical_outliers(self, df, columns=None, method='iqr', threshold=1.5):
        """Detect outliers using statistical methods"""
        if columns is None:
            columns = df.select_dtypes(include=[np.number]).columns

        outlier_indices = set()

        for column in columns:
            if method == 'iqr':
                Q1 = df[column].quantile(0.25)
                Q3 = df[column].quantile(0.75)
                IQR = Q3 - Q1
                lower_bound = Q1 - threshold * IQR
                upper_bound = Q3 + threshold * IQR

                column_outliers = df[(df[column] < lower_bound) |
                                   (df[column] > upper_bound)].index

            elif method == 'zscore':
                z_scores = np.abs(stats.zscore(df[column].dropna()))
                column_outliers = df[z_scores > threshold].index

            elif method == 'modified_zscore':
                median = df[column].median()
                mad = np.median(np.abs(df[column] - median))
                modified_z_scores = 0.6745 * (df[column] - median) / mad
                column_outliers = df[np.abs(modified_z_scores) > threshold].index

            outlier_indices.update(column_outliers)

        self.outlier_indices['statistical'] = list(outlier_indices)
        return list(outlier_indices)

    def isolation_forest_outliers(self, df, contamination=0.1, columns=None):
        """Detect outliers using Isolation Forest"""
        if columns is None:
            columns = df.select_dtypes(include=[np.number]).columns

        iso_forest = IsolationForest(contamination=contamination, random_state=42)
        outlier_labels = iso_forest.fit_predict(df[columns])

        outlier_indices = df[outlier_labels == -1].index.tolist()

        self.detectors['isolation_forest'] = iso_forest
        self.outlier_indices['isolation_forest'] = outlier_indices

        return outlier_indices

    def local_outlier_factor(self, df, n_neighbors=20, columns=None):
        """Detect outliers using Local Outlier Factor"""
        if columns is None:
            columns = df.select_dtypes(include=[np.number]).columns

        lof = LocalOutlierFactor(n_neighbors=n_neighbors)
        outlier_labels = lof.fit_predict(df[columns])

        outlier_indices = df[outlier_labels == -1].index.tolist()

        self.outlier_indices['lof'] = outlier_indices
        return outlier_indices

    def elliptic_envelope_outliers(self, df, contamination=0.1, columns=None):
        """Detect outliers using Elliptic Envelope"""
        if columns is None:
            columns = df.select_dtypes(include=[np.number]).columns

        ee = EllipticEnvelope(contamination=contamination, random_state=42)
        outlier_labels = ee.fit_predict(df[columns])

        outlier_indices = df[outlier_labels == -1].index.tolist()

        self.detectors['elliptic_envelope'] = ee
        self.outlier_indices['elliptic_envelope'] = outlier_indices

        return outlier_indices

    def treat_outliers(self, df, outlier_indices, method='remove'):
        """Treat detected outliers"""
        df_treated = df.copy()

        if method == 'remove':
            df_treated = df_treated.drop(outlier_indices)

        elif method == 'cap':
            numeric_columns = df.select_dtypes(include=[np.number]).columns

            for column in numeric_columns:
                Q1 = df[column].quantile(0.25)
                Q3 = df[column].quantile(0.75)
                IQR = Q3 - Q1
                lower_bound = Q1 - 1.5 * IQR
                upper_bound = Q3 + 1.5 * IQR

                df_treated[column] = df_treated[column].clip(lower=lower_bound,
                                                           upper=upper_bound)

        elif method == 'transform':
            # Log transformation for positive values
            numeric_columns = df.select_dtypes(include=[np.number]).columns

            for column in numeric_columns:
                if (df[column] > 0).all():
                    df_treated[column] = np.log1p(df[column])

        return df_treated

    def visualize_outliers(self, df, columns=None):
        """Visualize outliers in the data"""
        if columns is None:
            columns = df.select_dtypes(include=[np.number]).columns[:4]  # Limit to 4 columns

        fig, axes = plt.subplots(2, len(columns), figsize=(4*len(columns), 8))

        for i, column in enumerate(columns):
            # Box plot
            axes[0, i].boxplot(df[column].dropna())
            axes[0, i].set_title(f'{column} - Box Plot')

            # Histogram
            axes[1, i].hist(df[column].dropna(), bins=30, alpha=0.7)
            axes[1, i].set_title(f'{column} - Histogram')

        plt.tight_layout()
        plt.show()

# Example usage
def demonstrate_outlier_detection():
    # Create sample data with outliers
    np.random.seed(42)
    normal_data = np.random.randn(950, 3)
    outlier_data = np.random.randn(50, 3) * 5 + 10  # Outliers

    data = np.vstack([normal_data, outlier_data])
    df = pd.DataFrame(data, columns=['feature1', 'feature2', 'feature3'])

    detector = OutlierDetector()

    # Visualize data
    detector.visualize_outliers(df)

    # Detect outliers using different methods
    iqr_outliers = detector.statistical_outliers(df, method='iqr')
    iso_outliers = detector.isolation_forest_outliers(df, contamination=0.05)
    lof_outliers = detector.local_outlier_factor(df)

    print(f"IQR outliers detected: {len(iqr_outliers)}")
    print(f"Isolation Forest outliers detected: {len(iso_outliers)}")
    print(f"LOF outliers detected: {len(lof_outliers)}")

    # Treat outliers
    df_no_outliers = detector.treat_outliers(df, iqr_outliers, method='remove')
    df_capped = detector.treat_outliers(df, iqr_outliers, method='cap')

    print(f"\nOriginal data shape: {df.shape}")
    print(f"After removing outliers: {df_no_outliers.shape}")
    print(f"After capping outliers: {df_capped.shape}")

demonstrate_outlier_detection()

Feature Scaling and Normalization

Scaling Techniques

from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler, PowerTransformer
from sklearn.preprocessing import QuantileTransformer, Normalizer

class FeatureScaler:
    def __init__(self):
        self.scalers = {}
        self.scaling_methods = {
            'standard': StandardScaler(),
            'minmax': MinMaxScaler(),
            'robust': RobustScaler(),
            'power': PowerTransformer(method='yeo-johnson'),
            'quantile_uniform': QuantileTransformer(output_distribution='uniform'),
            'quantile_normal': QuantileTransformer(output_distribution='normal'),
            'normalizer': Normalizer()
        }

    def fit_transform_scaling(self, df, method='standard', columns=None):
        """Fit and transform data using specified scaling method"""
        if columns is None:
            columns = df.select_dtypes(include=[np.number]).columns

        df_scaled = df.copy()
        scaler = self.scaling_methods[method]

        df_scaled[columns] = scaler.fit_transform(df[columns])
        self.scalers[method] = scaler

        return df_scaled

    def compare_scaling_methods(self, df, columns=None):
        """Compare different scaling methods"""
        if columns is None:
            columns = df.select_dtypes(include=[np.number]).columns[:2]  # Limit for visualization

        fig, axes = plt.subplots(2, 4, figsize=(16, 8))
        axes = axes.ravel()

        # Original data
        axes[0].scatter(df[columns[0]], df[columns[1]], alpha=0.6)
        axes[0].set_title('Original Data')
        axes[0].set_xlabel(columns[0])
        axes[0].set_ylabel(columns[1])

        # Different scaling methods
        methods = ['standard', 'minmax', 'robust', 'power', 'quantile_uniform', 'quantile_normal']

        for i, method in enumerate(methods, 1):
            df_scaled = self.fit_transform_scaling(df, method=method, columns=columns)
            axes[i].scatter(df_scaled[columns[0]], df_scaled[columns[1]], alpha=0.6)
            axes[i].set_title(f'{method.title()} Scaling')
            axes[i].set_xlabel(f'{columns[0]} (scaled)')
            axes[i].set_ylabel(f'{columns[1]} (scaled)')

        plt.tight_layout()
        plt.show()

        # Statistical comparison
        comparison_stats = {}
        for method in methods:
            df_scaled = self.fit_transform_scaling(df, method=method, columns=columns)
            comparison_stats[method] = {
                'mean': df_scaled[columns].mean().mean(),
                'std': df_scaled[columns].std().mean(),
                'min': df_scaled[columns].min().min(),
                'max': df_scaled[columns].max().max()
            }

        comparison_df = pd.DataFrame(comparison_stats).T
        print("Scaling Methods Comparison:")
        print(comparison_df.round(3))

        return comparison_df

    def transform_new_data(self, df, method='standard', columns=None):
        """Transform new data using fitted scaler"""
        if method not in self.scalers:
            raise ValueError(f"Scaler for method '{method}' not fitted yet")

        if columns is None:
            columns = df.select_dtypes(include=[np.number]).columns

        df_scaled = df.copy()
        df_scaled[columns] = self.scalers[method].transform(df[columns])

        return df_scaled

# Example usage
def demonstrate_feature_scaling():
    # Create sample data with different scales
    np.random.seed(42)
    data = {
        'small_values': np.random.randn(1000) * 0.1,
        'medium_values': np.random.randn(1000) * 10 + 50,
        'large_values': np.random.randn(1000) * 1000 + 5000,
        'skewed_values': np.random.exponential(2, 1000)
    }

    df = pd.DataFrame(data)

    scaler = FeatureScaler()

    print("Original Data Statistics:")
    print(df.describe())

    # Compare scaling methods
    comparison_stats = scaler.compare_scaling_methods(df)

    # Demonstrate specific scaling
    df_standard = scaler.fit_transform_scaling(df, method='standard')
    print(f"\nAfter Standard Scaling:")
    print(df_standard.describe())

demonstrate_feature_scaling()

Categorical Data Encoding

Encoding Techniques

from sklearn.preprocessing import LabelEncoder, OneHotEncoder, OrdinalEncoder
from sklearn.preprocessing import TargetEncoder
import category_encoders as ce

class CategoricalEncoder:
    def __init__(self):
        self.encoders = {}
        self.encoding_mappings = {}

    def label_encoding(self, df, columns=None):
        """Label encoding for categorical variables"""
        if columns is None:
            columns = df.select_dtypes(include=['object']).columns

        df_encoded = df.copy()

        for column in columns:
            le = LabelEncoder()
            df_encoded[column] = le.fit_transform(df[column].astype(str))
            self.encoders[f'{column}_label'] = le

            # Store mapping for interpretation
            self.encoding_mappings[column] = dict(zip(le.classes_, le.transform(le.classes_)))

        return df_encoded

    def one_hot_encoding(self, df, columns=None, drop_first=False):
        """One-hot encoding for categorical variables"""
        if columns is None:
            columns = df.select_dtypes(include=['object']).columns

        df_encoded = df.copy()

        for column in columns:
            # Create dummy variables
            dummies = pd.get_dummies(df[column], prefix=column, drop_first=drop_first)

            # Drop original column and add dummy columns
            df_encoded = df_encoded.drop(column, axis=1)
            df_encoded = pd.concat([df_encoded, dummies], axis=1)

        return df_encoded

    def ordinal_encoding(self, df, columns=None, categories=None):
        """Ordinal encoding with custom ordering"""
        if columns is None:
            columns = df.select_dtypes(include=['object']).columns

        df_encoded = df.copy()

        for column in columns:
            if categories and column in categories:
                # Use custom ordering
                oe = OrdinalEncoder(categories=[categories[column]])
            else:
                # Use automatic ordering
                oe = OrdinalEncoder()

            df_encoded[column] = oe.fit_transform(df[[column]]).ravel()
            self.encoders[f'{column}_ordinal'] = oe

        return df_encoded

    def target_encoding(self, df, target_column, categorical_columns=None):
        """Target encoding (mean encoding)"""
        if categorical_columns is None:
            categorical_columns = df.select_dtypes(include=['object']).columns

        df_encoded = df.copy()

        for column in categorical_columns:
            if column != target_column:
                # Calculate mean target value for each category
                target_means = df.groupby(column)[target_column].mean()

                # Map categories to their target means
                df_encoded[f'{column}_target_encoded'] = df[column].map(target_means)

                # Store encoding for new data
                self.encoding_mappings[f'{column}_target'] = target_means.to_dict()

        return df_encoded

    def frequency_encoding(self, df, columns=None):
        """Frequency encoding"""
        if columns is None:
            columns = df.select_dtypes(include=['object']).columns

        df_encoded = df.copy()

        for column in columns:
            # Calculate frequency of each category
            freq_map = df[column].value_counts().to_dict()

            # Replace categories with their frequencies
            df_encoded[f'{column}_freq'] = df[column].map(freq_map)

            # Store mapping
            self.encoding_mappings[f'{column}_freq'] = freq_map

        return df_encoded

    def binary_encoding(self, df, columns=None):
        """Binary encoding for high cardinality categorical variables"""
        if columns is None:
            columns = df.select_dtypes(include=['object']).columns

        df_encoded = df.copy()

        for column in columns:
            be = ce.BinaryEncoder(cols=[column])
            binary_encoded = be.fit_transform(df[[column]])

            # Drop original column and add binary encoded columns
            df_encoded = df_encoded.drop(column, axis=1)
            df_encoded = pd.concat([df_encoded, binary_encoded], axis=1)

            self.encoders[f'{column}_binary'] = be

        return df_encoded

    def hash_encoding(self, df, columns=None, n_components=8):
        """Hash encoding for high cardinality categorical variables"""
        if columns is None:
            columns = df.select_dtypes(include=['object']).columns

        df_encoded = df.copy()

        for column in columns:
            he = ce.HashingEncoder(cols=[column], n_components=n_components)
            hash_encoded = he.fit_transform(df[[column]])

            # Drop original column and add hash encoded columns
            df_encoded = df_encoded.drop(column, axis=1)
            df_encoded = pd.concat([df_encoded, hash_encoded], axis=1)

            self.encoders[f'{column}_hash'] = he

        return df_encoded

    def compare_encoding_methods(self, df, target_column=None):
        """Compare different encoding methods"""
        categorical_columns = df.select_dtypes(include=['object']).columns

        if len(categorical_columns) == 0:
            print("No categorical columns found")
            return

        results = {}

        # Original data
        results['original'] = {
            'shape': df.shape,
            'memory_usage': df.memory_usage(deep=True).sum()
        }

        # Label encoding
        df_label = self.label_encoding(df, categorical_columns)
        results['label_encoding'] = {
            'shape': df_label.shape,
            'memory_usage': df_label.memory_usage(deep=True).sum()
        }

        # One-hot encoding
        df_onehot = self.one_hot_encoding(df, categorical_columns)
        results['one_hot_encoding'] = {
            'shape': df_onehot.shape,
            'memory_usage': df_onehot.memory_usage(deep=True).sum()
        }

        # Binary encoding
        df_binary = self.binary_encoding(df, categorical_columns)
        results['binary_encoding'] = {
            'shape': df_binary.shape,
            'memory_usage': df_binary.memory_usage(deep=True).sum()
        }

        # Frequency encoding
        df_freq = self.frequency_encoding(df, categorical_columns)
        results['frequency_encoding'] = {
            'shape': df_freq.shape,
            'memory_usage': df_freq.memory_usage(deep=True).sum()
        }

        # Target encoding (if target column provided)
        if target_column and target_column in df.columns:
            df_target = self.target_encoding(df, target_column, categorical_columns)
            results['target_encoding'] = {
                'shape': df_target.shape,
                'memory_usage': df_target.memory_usage(deep=True).sum()
            }

        # Create comparison DataFrame
        comparison_df = pd.DataFrame(results).T
        print("Encoding Methods Comparison:")
        print(comparison_df)

        return comparison_df

# Example usage
def demonstrate_categorical_encoding():
    # Create sample data with categorical variables
    np.random.seed(42)
    data = {
        'numeric_feature': np.random.randn(1000),
        'low_cardinality': np.random.choice(['A', 'B', 'C'], 1000),
        'medium_cardinality': np.random.choice([f'Cat_{i}' for i in range(10)], 1000),
        'high_cardinality': np.random.choice([f'Item_{i}' for i in range(100)], 1000),
        'ordinal_feature': np.random.choice(['Low', 'Medium', 'High'], 1000),
        'target': np.random.randint(0, 2, 1000)
    }

    df = pd.DataFrame(data)

    encoder = CategoricalEncoder()

    print("Original Data Info:")
    print(df.info())
    print(f"\nUnique values per categorical column:")
    for col in df.select_dtypes(include=['object']).columns:
        print(f"{col}: {df[col].nunique()} unique values")

    # Compare encoding methods
    comparison = encoder.compare_encoding_methods(df, target_column='target')

    # Demonstrate specific encodings
    print(f"\nLabel Encoding Example:")
    df_label = encoder.label_encoding(df, ['low_cardinality'])
    print(f"Mapping: {encoder.encoding_mappings['low_cardinality']}")

    print(f"\nTarget Encoding Example:")
    df_target = encoder.target_encoding(df, 'target', ['low_cardinality'])
    print(f"Target means: {encoder.encoding_mappings['low_cardinality_target']}")

demonstrate_categorical_encoding()

Text Preprocessing

Text Cleaning and Normalization

import re
import string
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize, sent_tokenize
from nltk.stem import PorterStemmer, WordNetLemmatizer
from nltk.chunk import ne_chunk
from nltk.tag import pos_tag
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from textblob import TextBlob
import spacy

# Download required NLTK data
nltk.download('punkt', quiet=True)
nltk.download('stopwords', quiet=True)
nltk.download('wordnet', quiet=True)
nltk.download('averaged_perceptron_tagger', quiet=True)
nltk.download('maxent_ne_chunker', quiet=True)
nltk.download('words', quiet=True)

class TextPreprocessor:
    def __init__(self, language='english'):
        self.language = language
        self.stop_words = set(stopwords.words(language))
        self.stemmer = PorterStemmer()
        self.lemmatizer = WordNetLemmatizer()

        # Try to load spaCy model
        try:
            self.nlp = spacy.load('en_core_web_sm')
        except OSError:
            print("spaCy model not found. Install with: python -m spacy download en_core_web_sm")
            self.nlp = None

    def basic_cleaning(self, text):
        """Basic text cleaning"""
        if not isinstance(text, str):
            return ""

        # Convert to lowercase
        text = text.lower()

        # Remove HTML tags
        text = re.sub(r'<[^>]+>', '', text)

        # Remove URLs
        text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE)

        # Remove email addresses
        text = re.sub(r'\S+@\S+', '', text)

        # Remove extra whitespace
        text = re.sub(r'\s+', ' ', text).strip()

        return text

    def remove_punctuation(self, text, keep_apostrophes=True):
        """Remove punctuation from text"""
        if keep_apostrophes:
            # Keep apostrophes for contractions
            punctuation = string.punctuation.replace("'", "")
        else:
            punctuation = string.punctuation

        translator = str.maketrans('', '', punctuation)
        return text.translate(translator)

    def remove_numbers(self, text, replace_with=''):
        """Remove or replace numbers in text"""
        return re.sub(r'\d+', replace_with, text)

    def expand_contractions(self, text):
        """Expand common English contractions"""
        contractions = {
            "ain't": "is not", "aren't": "are not", "can't": "cannot",
            "couldn't": "could not", "didn't": "did not", "doesn't": "does not",
            "don't": "do not", "hadn't": "had not", "hasn't": "has not",
            "haven't": "have not", "he'd": "he would", "he'll": "he will",
            "he's": "he is", "i'd": "i would", "i'll": "i will",
            "i'm": "i am", "i've": "i have", "isn't": "is not",
            "it'd": "it would", "it'll": "it will", "it's": "it is",
            "let's": "let us", "shouldn't": "should not", "that's": "that is",
            "there's": "there is", "they'd": "they would", "they'll": "they will",
            "they're": "they are", "they've": "they have", "we'd": "we would",
            "we're": "we are", "we've": "we have", "weren't": "were not",
            "what's": "what is", "where's": "where is", "who's": "who is",
            "won't": "will not", "wouldn't": "would not", "you'd": "you would",
            "you'll": "you will", "you're": "you are", "you've": "you have"
        }

        for contraction, expansion in contractions.items():
            text = re.sub(contraction, expansion, text, flags=re.IGNORECASE)

        return text

    def tokenize(self, text, method='word'):
        """Tokenize text into words or sentences"""
        if method == 'word':
            return word_tokenize(text)
        elif method == 'sentence':
            return sent_tokenize(text)
        else:
            return text.split()

    def remove_stopwords(self, tokens, custom_stopwords=None):
        """Remove stopwords from token list"""
        stop_words = self.stop_words

        if custom_stopwords:
            stop_words = stop_words.union(set(custom_stopwords))

        return [token for token in tokens if token.lower() not in stop_words]

    def stem_tokens(self, tokens):
        """Apply stemming to tokens"""
        return [self.stemmer.stem(token) for token in tokens]

    def lemmatize_tokens(self, tokens):
        """Apply lemmatization to tokens"""
        return [self.lemmatizer.lemmatize(token) for token in tokens]

    def pos_tagging(self, tokens):
        """Part-of-speech tagging"""
        return pos_tag(tokens)

    def named_entity_recognition(self, tokens):
        """Named entity recognition"""
        pos_tags = pos_tag(tokens)
        return ne_chunk(pos_tags)

    def advanced_preprocessing_spacy(self, text):
        """Advanced preprocessing using spaCy"""
        if self.nlp is None:
            return self.basic_preprocessing(text)

        doc = self.nlp(text)

        # Extract various linguistic features
        processed_tokens = []
        entities = []

        for token in doc:
            # Skip punctuation, spaces, and stop words
            if not token.is_punct and not token.is_space and not token.is_stop:
                # Use lemmatized form
                processed_tokens.append(token.lemma_.lower())

        # Extract named entities
        for ent in doc.ents:
            entities.append((ent.text, ent.label_))

        return {
            'processed_tokens': processed_tokens,
            'entities': entities,
            'sentences': [sent.text for sent in doc.sents]
        }

    def basic_preprocessing(self, text):
        """Complete basic preprocessing pipeline"""
        # Basic cleaning
        text = self.basic_cleaning(text)

        # Expand contractions
        text = self.expand_contractions(text)

        # Remove punctuation
        text = self.remove_punctuation(text)

        # Tokenize
        tokens = self.tokenize(text)

        # Remove stopwords
        tokens = self.remove_stopwords(tokens)

        # Lemmatize
        tokens = self.lemmatize_tokens(tokens)

        return tokens

    def preprocess_corpus(self, texts, method='basic'):
        """Preprocess a corpus of texts"""
        processed_texts = []

        for text in texts:
            if method == 'basic':
                processed = self.basic_preprocessing(text)
                processed_texts.append(' '.join(processed))
            elif method == 'advanced' and self.nlp:
                processed = self.advanced_preprocessing_spacy(text)
                processed_texts.append(' '.join(processed['processed_tokens']))
            else:
                processed_texts.append(self.basic_cleaning(text))

        return processed_texts

class TextVectorizer:
    def __init__(self):
        self.vectorizers = {}

    def bag_of_words(self, texts, max_features=1000, ngram_range=(1, 1)):
        """Create bag-of-words representation"""
        vectorizer = CountVectorizer(
            max_features=max_features,
            ngram_range=ngram_range,
            lowercase=True,
            stop_words='english'
        )

        bow_matrix = vectorizer.fit_transform(texts)
        self.vectorizers['bow'] = vectorizer

        return bow_matrix, vectorizer.get_feature_names_out()

    def tfidf_vectorization(self, texts, max_features=1000, ngram_range=(1, 1)):
        """Create TF-IDF representation"""
        vectorizer = TfidfVectorizer(
            max_features=max_features,
            ngram_range=ngram_range,
            lowercase=True,
            stop_words='english'
        )

        tfidf_matrix = vectorizer.fit_transform(texts)
        self.vectorizers['tfidf'] = vectorizer

        return tfidf_matrix, vectorizer.get_feature_names_out()

    def get_top_features(self, vectorizer_name, n_features=20):
        """Get top features from vectorizer"""
        if vectorizer_name not in self.vectorizers:
            return None

        vectorizer = self.vectorizers[vectorizer_name]

        if hasattr(vectorizer, 'idf_'):
            # TF-IDF vectorizer
            feature_names = vectorizer.get_feature_names_out()
            tfidf_scores = vectorizer.idf_

            # Get top features by IDF score
            top_indices = np.argsort(tfidf_scores)[:n_features]
            top_features = [(feature_names[i], tfidf_scores[i]) for i in top_indices]
        else:
            # Count vectorizer
            feature_names = vectorizer.get_feature_names_out()
            top_features = list(feature_names[:n_features])

        return top_features

# Example usage
def demonstrate_text_preprocessing():
    # Sample texts
    sample_texts = [
        "Hello! This is a sample text with URLs like https://example.com and emails like test@email.com.",
        "I can't believe it's already 2024! The weather is great today, isn't it?",
        "Machine Learning and Natural Language Processing are fascinating fields in AI.",
        "Don't forget to preprocess your text data before training your models!",
        "Text preprocessing includes cleaning, tokenization, and normalization steps."
    ]

    preprocessor = TextPreprocessor()
    vectorizer = TextVectorizer()

    print("Original texts:")
    for i, text in enumerate(sample_texts):
        print(f"{i+1}. {text}")

    # Basic preprocessing
    print(f"\nBasic preprocessing:")
    processed_texts = preprocessor.preprocess_corpus(sample_texts, method='basic')
    for i, text in enumerate(processed_texts):
        print(f"{i+1}. {text}")

    # Advanced preprocessing (if spaCy is available)
    if preprocessor.nlp:
        print(f"\nAdvanced preprocessing with spaCy:")
        for i, text in enumerate(sample_texts[:2]):  # Process first 2 texts
            result = preprocessor.advanced_preprocessing_spacy(text)
            print(f"{i+1}. Tokens: {result['processed_tokens']}")
            print(f"   Entities: {result['entities']}")

    # Vectorization
    print(f"\nText Vectorization:")

    # Bag of Words
    bow_matrix, bow_features = vectorizer.bag_of_words(processed_texts, max_features=50)
    print(f"Bag of Words shape: {bow_matrix.shape}")
    print(f"Sample features: {bow_features[:10]}")

    # TF-IDF
    tfidf_matrix, tfidf_features = vectorizer.tfidf_vectorization(processed_texts, max_features=50)
    print(f"TF-IDF shape: {tfidf_matrix.shape}")
    print(f"Sample features: {tfidf_features[:10]}")

demonstrate_text_preprocessing()

Conclusion

Data preprocessing is a fundamental skill in machine learning that significantly impacts model performance. This comprehensive guide covers essential techniques for handling missing values, outliers, scaling, categorical encoding, and text preprocessing.

Next Steps

  1. Practice with real-world datasets
  2. Experiment with different preprocessing techniques
  3. Build automated preprocessing pipelines
  4. Study domain-specific preprocessing requirements
  5. Learn about data validation and monitoring techniques
Share this article: