Source code for binlearn.methods._dbscan_binning

"""
Clean DBSCAN binning implementation for  architecture.

This module provides DBSCANBinning that inherits from IntervalBinningBase.
Uses DBSCAN clustering to find natural density-based bin boundaries.
"""

from __future__ import annotations

from typing import Any

import numpy as np
from sklearn.cluster import DBSCAN

from ..base import IntervalBinningBase
from ..config import apply_config_defaults
from ..utils import (
    BinEdgesDict,
    ConfigurationError,
    apply_equal_width_fallback,
    create_param_dict_for_config,
    validate_positive_integer,
    validate_positive_number,
)


# pylint: disable=too-many-ancestors
[docs] class DBSCANBinning(IntervalBinningBase): """DBSCAN clustering-based binning implementation using clean architecture. Creates bins based on DBSCAN (Density-Based Spatial Clustering of Applications with Noise) clustering of each feature. The bin edges are determined by the natural cluster boundaries identified by DBSCAN, which naturally groups densely connected values together while treating isolated points as noise. The DBSCAN algorithm finds dense regions in the data and creates natural groupings that respect the underlying data distribution. Unlike k-means or equal-width binning, DBSCAN does not assume any particular shape for clusters and can identify clusters of varying densities. The resulting bins correspond to naturally occurring dense regions in the data. When DBSCAN produces fewer clusters than the minimum required bins, the algorithm falls back to equal-width binning to ensure the minimum bin count is satisfied. This implementation follows the clean binlearn architecture with straight inheritance, dynamic column resolution, and parameter reconstruction capabilities. Args: eps: The maximum distance between two samples for them to be considered as in the same neighborhood. This is the key parameter that controls cluster density. Smaller values create more, smaller clusters. Larger values merge clusters together. If None, uses configuration default. min_samples: The minimum number of samples in a neighborhood for a point to be considered as a core point (including the point itself). Controls the minimum cluster size. If None, uses configuration default. min_bins: Minimum number of bins to create. If DBSCAN produces fewer clusters, falls back to equal-width binning. Must be at least 1. If None, uses configuration default. allow_fallback: Whether to fall back to equal-width binning when DBSCAN produces fewer clusters than min_bins. If True (default), uses equal-width binning as fallback with a warning. If False, raises an error when insufficient clusters are found. If None, uses configuration default. clip: Whether to clip values outside the fitted range to the nearest bin edge. If None, uses configuration default. preserve_dataframe: Whether to preserve pandas DataFrame structure in transform operations. If None, uses configuration default. fit_jointly: Whether to fit all columns together (False for DBSCAN - always fits columns independently). If None, uses configuration default. bin_edges: Pre-computed bin edges for reconstruction. Should not be provided during normal usage. bin_representatives: Pre-computed bin representatives for reconstruction. Should not be provided during normal usage. class_: Class name for reconstruction compatibility. Internal use only. module_: Module name for reconstruction compatibility. Internal use only. Attributes: eps: Maximum distance for neighborhood definition min_samples: Minimum samples for core point definition min_bins: Minimum number of bins to ensure allow_fallback: Whether to fall back to equal-width binning when needed Example: >>> import numpy as np >>> from binlearn.methods import DBSCANBinning >>> >>> # Create sample data with natural clusters >>> data = np.concatenate([ ... np.random.normal(0, 0.5, 100), # First cluster ... np.random.normal(5, 0.8, 150), # Second cluster ... np.random.normal(10, 0.3, 80) # Third cluster ... ]) >>> >>> # Initialize DBSCAN binning >>> binner = DBSCANBinning(eps=0.8, min_samples=10, min_bins=3) >>> >>> # Fit and transform >>> X = data.reshape(-1, 1) >>> binner.fit(X) >>> X_binned = binner.transform(X) >>> >>> # Check identified bins >>> print(f"Number of bins: {len(binner.bin_edges_[0]) - 1}") >>> print(f"Bin edges: {binner.bin_edges_[0]}") Note: - DBSCAN is particularly effective for data with natural density-based clusters - The eps parameter requires careful tuning based on data scale and density - Noise points (outliers) identified by DBSCAN are included in boundary bins - Falls back to equal-width binning if insufficient clusters are found - Each column is processed independently (unsupervised approach) - Requires at least min_samples finite values per column for clustering See Also: KMeansBinning: Alternative clustering-based binning with fixed cluster count EqualWidthBinning: Simple equal-width interval binning GaussianMixtureBinning: Probabilistic clustering-based binning """ # pylint: disable=too-many-arguments,too-many-positional-arguments
[docs] def __init__( self, eps: float | None = None, min_samples: int | None = None, min_bins: int | None = None, allow_fallback: bool | None = None, clip: bool | None = None, preserve_dataframe: bool | None = None, fit_jointly: bool | None = None, *, bin_edges: BinEdgesDict | None = None, bin_representatives: BinEdgesDict | None = None, class_: ( # pylint: disable=unused-argument str | None ) = None, # For reconstruction compatibility module_: ( # pylint: disable=unused-argument str | None ) = None, # For reconstruction compatibility ): """Initialize DBSCAN binning with clustering parameters. Sets up DBSCAN clustering-based binning with specified parameters. Applies configuration defaults for any unspecified parameters and validates the resulting configuration. Args: eps: Maximum distance between two samples for neighborhood definition. Controls cluster density - smaller values create tighter, more numerous clusters. Must be positive. If None, uses configuration default. min_samples: Minimum number of samples in a neighborhood for a core point. Controls minimum cluster size and noise tolerance. Must be positive integer. If None, uses configuration default. min_bins: Minimum number of bins to ensure. If DBSCAN produces fewer clusters, falls back to equal-width binning. Must be at least 1. If None, uses configuration default. allow_fallback: Whether to fall back to equal-width binning when DBSCAN produces fewer clusters than min_bins. If True (default), uses equal-width binning as fallback with a warning. If False, raises an error when insufficient clusters are found. If None, uses configuration default. clip: Whether to clip transformed values outside the fitted range to the nearest bin edge. If None, uses configuration default. preserve_dataframe: Whether to preserve pandas DataFrame structure in transform operations. If None, uses configuration default. fit_jointly: Whether to fit all columns together. Always False for DBSCAN as it processes columns independently. If None, uses configuration default. bin_edges: Pre-computed bin edges dictionary for reconstruction. Internal use only - should not be provided during normal initialization. bin_representatives: Pre-computed representatives dictionary for reconstruction. Internal use only. class_: Class name string for reconstruction compatibility. Internal use only. module_: Module name string for reconstruction compatibility. Internal use only. Example: >>> # Standard initialization with custom parameters >>> binner = DBSCANBinning(eps=0.5, min_samples=8, min_bins=3) >>> >>> # Use configuration defaults >>> binner = DBSCANBinning() >>> >>> # Custom clustering with clipping enabled >>> binner = DBSCANBinning( ... eps=1.2, ... min_samples=15, ... min_bins=4, ... clip=True, ... preserve_dataframe=True ... ) Note: - Parameter validation occurs during initialization - Configuration defaults are applied for None parameters - Reconstruction parameters (bin_edges, bin_representatives, class_, module_) are used internally for object reconstruction and should not be provided during normal usage - The eps parameter is critical for DBSCAN performance and may require experimentation based on data characteristics """ # Use standardized initialization pattern user_params = create_param_dict_for_config( eps=eps, min_samples=min_samples, min_bins=min_bins, allow_fallback=allow_fallback, clip=clip, preserve_dataframe=preserve_dataframe, fit_jointly=fit_jointly, ) # Apply configuration defaults resolved_params = apply_config_defaults("dbscan", user_params) # Store method-specific parameters self.eps = resolved_params.get("eps", 0.1) self.min_samples = resolved_params.get("min_samples", 5) self.min_bins = resolved_params.get("min_bins", 2) self.allow_fallback = resolved_params.get("allow_fallback", True) # Initialize parent with resolved parameters IntervalBinningBase.__init__( self, clip=resolved_params.get("clip"), preserve_dataframe=resolved_params.get("preserve_dataframe"), fit_jointly=resolved_params.get("fit_jointly"), guidance_columns=None, bin_edges=bin_edges, bin_representatives=bin_representatives, )
def _validate_params(self) -> None: """Validate DBSCAN binning parameters.""" # Call parent validation IntervalBinningBase._validate_params(self) # Use standardized validation utilities validate_positive_number(self.eps, "eps", allow_zero=False) validate_positive_integer(self.min_samples, "min_samples") validate_positive_integer(self.min_bins, "min_bins") def _calculate_bins( self, x_col: np.ndarray[Any, Any], col_id: Any, guidance_data: np.ndarray[Any, Any] | None = None, ) -> tuple[list[float], list[float]]: """Calculate DBSCAN clustering-based bins for a single column. Uses DBSCAN clustering to find natural density-based groupings and creates bin boundaries at cluster boundaries. Args: x_col: Preprocessed column data (from base class) col_id: Column identifier for error reporting guidance_data: Not used for DBSCAN binning (unsupervised) Returns: Tuple of (bin_edges, bin_representatives) Raises: ValueError: If insufficient data for clustering """ return self._create_dbscan_bins(x_col, col_id) # pylint: disable=too-many-locals def _create_dbscan_bins( self, x_col: np.ndarray[Any, Any], col_id: Any, ) -> tuple[list[float], list[float]]: """Create DBSCAN clustering-based bins. Args: x_col: Column data that may contain NaN/inf values col_id: Column identifier for error reporting Returns: Tuple of (bin_edges, bin_representatives) """ # Filter out NaN and infinite values for DBSCAN fitting finite_mask = np.isfinite(x_col) x_col_clean = x_col[finite_mask] if len(x_col_clean) < self.min_samples: raise ValueError( f"Column {col_id}: Insufficient finite values ({len(x_col_clean)}) " f"for DBSCAN clustering. Need at least {self.min_samples} values." ) # Reshape data for DBSCAN (expects 2D array) X_reshaped = x_col_clean.reshape(-1, 1) # Apply DBSCAN clustering using safe sklearn call dbscan = DBSCAN(eps=self.eps, min_samples=self.min_samples) cluster_labels = dbscan.fit_predict(X_reshaped) # Get unique clusters (excluding noise points labeled as -1) unique_clusters = np.unique(cluster_labels[cluster_labels != -1]) if len(unique_clusters) < self.min_bins: # Check if fallback is allowed if not self.allow_fallback: raise ConfigurationError( f"DBSCAN found only {len(unique_clusters)} clusters, " f"but min_bins={self.min_bins}", suggestions=[ f"Reduce min_bins to {len(unique_clusters)} or lower", "Adjust eps parameter to find more clusters", "Reduce min_samples parameter", "Set allow_fallback=True to enable equal-width fallback", ], ) # Fall back to equal-width binning if too few clusters return list(apply_equal_width_fallback(x_col_clean, self.min_bins, "DBSCAN")), [ float(val) for val in np.linspace(np.min(x_col_clean), np.max(x_col_clean), self.min_bins) ] # Calculate cluster centers and boundaries cluster_centers = [] for cluster_id in unique_clusters: cluster_mask = cluster_labels == cluster_id cluster_data = x_col_clean[cluster_mask] # Calculate cluster center center = float(np.mean(cluster_data)) cluster_centers.append((center, np.min(cluster_data), np.max(cluster_data))) # Sort clusters by center cluster_centers.sort(key=lambda x: x[0]) # Create bin edges from cluster boundaries edges = [cluster_centers[0][1]] # Start with minimum of first cluster for i in range(len(cluster_centers) - 1): # Boundary between clusters is the midpoint boundary = (cluster_centers[i][2] + cluster_centers[i + 1][1]) / 2 edges.append(boundary) edges.append(cluster_centers[-1][2]) # End with maximum of last cluster # Representatives are cluster centers reps = [center for center, _, _ in cluster_centers] return edges, reps