Source code for snputils.phenotype.genobj.phenobj

import copy
from typing import List, Optional, Sequence

import numpy as np

from snputils._utils.printing import array_shape, format_repr


[docs] class PhenotypeObject: """ Generic phenotype container for single-trait analyses. The object stores sample IDs, normalized phenotype values, inferred/declared trait type, and binary case/control convenience attributes. """ def __init__( self, samples: Sequence[str], values: Sequence[float], phenotype_name: str = "PHENO", quantitative: Optional[bool] = None, ) -> None: sample_ids = [str(sample) for sample in samples] if len(sample_ids) == 0: raise ValueError("Phenotype file contains no samples.") if len(set(sample_ids)) != len(sample_ids): raise ValueError("Phenotype sample IDs must be unique.") try: values_f64 = np.asarray(values, dtype=np.float64) except (TypeError, ValueError) as exc: raise ValueError("Phenotype values must be numeric.") from exc if values_f64.ndim != 1: raise ValueError("Phenotype values must be a 1-dimensional array.") if values_f64.size != len(sample_ids): raise ValueError( "Phenotype sample/value length mismatch: " f"{len(sample_ids)} samples but {values_f64.size} values." ) if not np.all(np.isfinite(values_f64)): raise ValueError("Phenotype contains non-finite values (NaN/Inf).") trait_is_quantitative = ( self._infer_quantitative(values_f64) if quantitative is None else bool(quantitative) ) if trait_is_quantitative: if float(np.var(values_f64)) <= 0.0: raise ValueError("Quantitative phenotype has zero variance.") normalized_values = values_f64 cases: List[str] = [] controls: List[str] = sample_ids.copy() else: normalized_values = self._normalize_binary(values_f64) case_mask = normalized_values == 1 control_mask = normalized_values == 0 cases = [sample_ids[idx] for idx in np.where(case_mask)[0].tolist()] controls = [sample_ids[idx] for idx in np.where(control_mask)[0].tolist()] if len(cases) == 0: raise ValueError("No case data available.") if len(controls) == 0: raise ValueError("No control data available.") self._samples = sample_ids self._values = normalized_values self._phenotype_name = str(phenotype_name) self._is_quantitative = trait_is_quantitative self._cases = cases self._controls = controls self._all_haplotypes = [f"{sample}.0" for sample in sample_ids] + [ f"{sample}.1" for sample in sample_ids ] self._cases_haplotypes = [f"{sample}.0" for sample in cases] + [ f"{sample}.1" for sample in cases ] self._controls_haplotypes = [f"{sample}.0" for sample in controls] + [ f"{sample}.1" for sample in controls ] @staticmethod def _matches_binary_encoding(values_f64: np.ndarray, encoding: Sequence[float]) -> bool: unique_vals = np.unique(values_f64) if unique_vals.size != 2: return False target = np.asarray(sorted(float(v) for v in encoding), dtype=np.float64) observed = np.asarray(sorted(unique_vals.tolist()), dtype=np.float64) return bool(np.allclose(observed, target, rtol=0.0, atol=1e-8)) @staticmethod def _infer_quantitative(values_f64: np.ndarray) -> bool: return not ( PhenotypeObject._matches_binary_encoding(values_f64, (0.0, 1.0)) or PhenotypeObject._matches_binary_encoding(values_f64, (1.0, 2.0)) ) @staticmethod def _normalize_binary(values_f64: np.ndarray) -> np.ndarray: unique_vals = np.unique(values_f64) if PhenotypeObject._matches_binary_encoding(values_f64, (1.0, 2.0)): return np.isclose(values_f64, 2.0, rtol=0.0, atol=1e-8).astype(np.int8) if PhenotypeObject._matches_binary_encoding(values_f64, (0.0, 1.0)): return values_f64.astype(np.int8) raise ValueError( "Binary phenotype must use exactly two levels encoded as {1,2} or {0,1}. " f"Observed unique values: {sorted(unique_vals.tolist())}" ) def __getitem__(self, key): try: return getattr(self, key) except AttributeError as exc: raise KeyError(f"Invalid key: {key}") from exc def __setitem__(self, key, value): try: setattr(self, key, value) except AttributeError as exc: raise KeyError(f"Invalid key: {key}") from exc def __repr__(self) -> str: fields = { "phenotype_name": self._phenotype_name, "shape": self.shape, "n_samples": self.n_samples, "trait_type": "quantitative" if self._is_quantitative else "binary", } if not self._is_quantitative: fields["n_cases"] = self.n_cases fields["n_controls"] = self.n_controls return format_repr(self, **fields) def __str__(self) -> str: return self.__repr__() @property def samples(self) -> List[str]: return self._samples @property def n_samples(self) -> int: return len(self._samples) @property def values(self) -> np.ndarray: return self._values @property def shape(self) -> tuple[int, ...]: return array_shape(self._values) or (self.n_samples,) @property def y(self) -> np.ndarray: return self._values @property def phenotype_name(self) -> str: return self._phenotype_name @property def is_quantitative(self) -> bool: return self._is_quantitative @property def quantitative(self) -> bool: return self._is_quantitative @property def cases(self) -> List[str]: return self._cases @property def n_cases(self) -> int: return len(self._cases) @property def controls(self) -> List[str]: return self._controls @property def n_controls(self) -> int: return len(self._controls) @property def all_haplotypes(self) -> List[str]: return self._all_haplotypes @property def cases_haplotypes(self) -> List[str]: return self._cases_haplotypes @property def controls_haplotypes(self) -> List[str]: return self._controls_haplotypes def copy(self): return copy.copy(self) def keys(self) -> List[str]: return [ "samples", "n_samples", "values", "y", "phenotype_name", "is_quantitative", "quantitative", "cases", "n_cases", "controls", "n_controls", "all_haplotypes", "cases_haplotypes", "controls_haplotypes", ]