Source code for snputils.phenotype.io.read.phenotypeReader
import warnings
from pathlib import Path
from typing import Optional, Union
import pandas as pd
from .base import PhenotypeBaseReader
from snputils.phenotype.genobj import PhenotypeObject
[docs]
class PhenotypeReader(PhenotypeBaseReader):
"""
Reader for single-trait phenotype files (any extension; common: .txt, .phe, .pheno).
Expected format (headered, whitespace-delimited):
- Must include `IID` (optionally preceded by `FID`)
- First phenotype column after `IID` is used by default
"""
def __init__(self, file: Union[str, Path]) -> None:
super().__init__(file)
@property
def file(self) -> Path:
return Path(self._file)
@staticmethod
def _has_header_with_iid(file_path: Path) -> bool:
with open(file_path, "r", encoding="utf-8") as handle:
for raw_line in handle:
line = raw_line.strip()
if not line:
continue
tokens = line.split()
return any(token.lstrip("#").upper() == "IID" for token in tokens)
raise ValueError("Empty phenotype file.")
@staticmethod
def _resolve_column(columns, normalized_columns, requested: str) -> Optional[str]:
requested_norm = str(requested).lstrip("#").upper()
for col, col_norm in zip(columns, normalized_columns):
if str(col) == str(requested) or col_norm == requested_norm:
return str(col)
return None
[docs]
def read(
self,
phenotype_col: Optional[str] = None,
quantitative: Optional[bool] = None,
) -> PhenotypeObject:
file_path = self.file
if not file_path.exists():
raise FileNotFoundError(f"Phenotype file not found: '{file_path}'")
has_iid_header = self._has_header_with_iid(file_path)
if has_iid_header:
phen_df = pd.read_csv(file_path, sep=r"\s+", dtype=str)
else:
warnings.warn(
(
"Phenotype file has no header/IID column. Legacy 3-column parsing "
"(FID IID PHENO) is deprecated; please switch to a headered format."
),
UserWarning,
stacklevel=2,
)
legacy = pd.read_csv(file_path, header=None, sep=r"\s+", dtype=str)
if legacy.shape[1] < 3:
raise ValueError(
"Legacy phenotype parsing expects at least 3 columns: FID IID PHENO."
)
phen_df = legacy.iloc[:, :3].copy()
phen_df.columns = ["FID", "IID", "PHENO"]
if phen_df.empty:
raise ValueError("Empty phenotype file.")
columns = [str(col) for col in phen_df.columns]
normalized_columns = [col.lstrip("#").upper() for col in columns]
if "IID" not in normalized_columns:
raise ValueError("Phenotype file must include an IID column in the header.")
iid_col = columns[normalized_columns.index("IID")]
iid_series = phen_df[iid_col].astype(str).str.strip()
if iid_series.eq("").any():
raise ValueError("Phenotype IID column contains empty values.")
if iid_series.duplicated().any():
raise ValueError("Phenotype IID values must be unique.")
if phenotype_col is not None:
resolved = self._resolve_column(columns, normalized_columns, phenotype_col)
if resolved is None:
raise ValueError(
f"Phenotype column '{phenotype_col}' not found in header: {columns}"
)
target_col = resolved
else:
iid_idx = normalized_columns.index("IID")
if iid_idx + 1 >= len(columns):
raise ValueError(
"Phenotype file must include at least one phenotype column after IID."
)
target_col = columns[iid_idx + 1]
values = pd.to_numeric(phen_df[target_col], errors="coerce")
if values.isna().any():
bad_examples = phen_df.loc[values.isna(), target_col].astype(str).head(5).tolist()
raise ValueError(
f"Phenotype column '{target_col}' contains non-numeric or missing values: "
f"{bad_examples}"
)
phenotype_name = str(target_col).lstrip("#")
return PhenotypeObject(
samples=iid_series.tolist(),
values=values.to_numpy(),
phenotype_name=phenotype_name,
quantitative=quantitative,
)
PhenotypeBaseReader.register(PhenotypeReader)