Source code for snputils.phenotype.genobj.multi_phenobj

import copy
from typing import List, Optional, Sequence, Union

import numpy as np
import pandas as pd

from snputils._utils.printing import array_shape, format_repr


[docs] class MultiPhenotypeObject: """Sample-aligned table for multiple phenotypes or sample traits.""" def __init__( self, phen_df: pd.DataFrame, sample_column: Optional[str] = None, ) -> None: normalized_df, resolved_sample_column = self._normalize_frame( phen_df, sample_column=sample_column, ) self._phen_df = normalized_df self._sample_column = resolved_sample_column @staticmethod def _normalize_frame( phen_df: pd.DataFrame, *, sample_column: Optional[str], ) -> tuple[pd.DataFrame, str]: if not isinstance(phen_df, pd.DataFrame): raise TypeError("phen_df must be a pandas DataFrame.") if phen_df.empty: raise ValueError("MultiPhenotypeObject contains no samples.") if phen_df.shape[1] < 2: raise ValueError( "MultiPhenotypeObject requires a sample column and at least one phenotype column." ) columns = [str(col) for col in phen_df.columns] if len(set(columns)) != len(columns): raise ValueError("Phenotype column names must be unique.") if sample_column is None: resolved = columns[0] else: resolved = str(sample_column) if resolved not in columns: raise ValueError( f"Sample column '{resolved}' not found in phenotype table: {columns}" ) ordered_columns = [resolved] + [col for col in columns if col != resolved] normalized = phen_df.loc[:, ordered_columns].copy() normalized.columns = ordered_columns sample_series = normalized.iloc[:, 0].astype(str).str.strip() if sample_series.eq("").any(): raise ValueError("Phenotype sample IDs contain empty values.") if sample_series.duplicated().any(): raise ValueError("Phenotype sample IDs must be unique.") normalized.iloc[:, 0] = sample_series return normalized.reset_index(drop=True), resolved def __getitem__(self, key): try: return getattr(self, key) except AttributeError as exc: raise KeyError(f"Invalid key: {key}") from exc def __setitem__(self, key, value): try: setattr(self, key, value) except AttributeError as exc: raise KeyError(f"Invalid key: {key}") from exc def __repr__(self) -> str: return format_repr( self, shape=self.shape, n_samples=self.n_samples, n_phenotypes=self.n_phenotypes, sample_column=self.sample_column, ) def __str__(self) -> str: return self.__repr__() @property def phen_df(self) -> pd.DataFrame: return self._phen_df @phen_df.setter def phen_df(self, x: pd.DataFrame): normalized_df, resolved_sample_column = self._normalize_frame( x, sample_column=self._sample_column, ) self._phen_df = normalized_df self._sample_column = resolved_sample_column @property def sample_column(self) -> str: return self._sample_column @property def samples(self) -> List[str]: return self._phen_df.iloc[:, 0].astype(str).tolist() @property def n_samples(self) -> int: return len(self._phen_df) @property def phenotype_names(self) -> List[str]: return [str(col) for col in self._phen_df.columns[1:].tolist()] @property def n_phenotypes(self) -> int: return len(self.phenotype_names) @property def values(self) -> np.ndarray: return self._phen_df.iloc[:, 1:].to_numpy() @property def shape(self) -> tuple[int, int]: phen_shape = array_shape(self._phen_df) if phen_shape is None: return (self.n_samples, self.n_phenotypes + 1) return phen_shape def copy(self): return MultiPhenotypeObject(self._phen_df.copy(), sample_column=self._sample_column) def keys(self) -> List[str]: return [ "phen_df", "sample_column", "samples", "n_samples", "phenotype_names", "n_phenotypes", "values", ]
[docs] def filter_samples( self, samples: Optional[Union[str, Sequence[str], np.ndarray]] = None, indexes: Optional[Union[int, Sequence[int], np.ndarray]] = None, include: bool = True, reorder: bool = False, inplace: bool = False, ) -> Optional["MultiPhenotypeObject"]: """Filter rows by sample ID or row index.""" if samples is None and indexes is None: raise ValueError("At least one of 'samples' or 'indexes' must be provided.") n_samples = self.n_samples sample_names = np.asarray(self.samples, dtype=object) if samples is not None: samples = np.asarray(samples).ravel().astype(object) mask_samples = np.isin(sample_names, samples) else: mask_samples = np.zeros(n_samples, dtype=bool) if indexes is not None: indexes = np.asarray(indexes).ravel() if np.any(indexes >= n_samples) or np.any(indexes < -n_samples): raise IndexError("One or more sample indexes are out of bounds.") indexes = np.mod(indexes, n_samples) mask_indexes = np.zeros(n_samples, dtype=bool) mask_indexes[indexes] = True else: mask_indexes = np.zeros(n_samples, dtype=bool) mask_combined = mask_samples | mask_indexes if not include: mask_combined = ~mask_combined ordered_indices = None if include and reorder: selected = np.where(mask_combined)[0] ordered_list = [] added = np.zeros(n_samples, dtype=bool) if samples is not None: for sample_id in samples: matches = np.where(sample_names == sample_id)[0] for idx in matches: if mask_combined[idx] and not added[idx]: ordered_list.append(int(idx)) added[idx] = True if indexes is not None: for idx in np.mod(np.atleast_1d(indexes), n_samples): if mask_combined[idx] and not added[idx]: ordered_list.append(int(idx)) added[idx] = True for idx in selected: if not added[idx]: ordered_list.append(int(idx)) ordered_indices = np.asarray(ordered_list, dtype=int) result_df = ( self._phen_df.iloc[ordered_indices].reset_index(drop=True) if ordered_indices is not None else self._phen_df.loc[mask_combined].reset_index(drop=True) ) if inplace: self._phen_df = result_df return None return MultiPhenotypeObject(result_df, sample_column=self._sample_column)