from __future__ import annotations
import logging
import numpy as np
import pandas as pd
import copy
from typing import Any, Union, Tuple, List, Optional
import pygrgl as pyg
import subprocess
log = logging.getLogger(__name__)
import tempfile
from snputils._utils.printing import format_repr
GRGType = Union[pyg.GRG, pyg.MutableGRG]
[docs]
class GRGObject:
"""
A class for Single Nucleotide Polymorphism (SNP) data.
"""
def __init__(
self,
genotypes: Optional[GRGType] = None,
filename: Optional[str] = None,
mutable: Optional[bool] = None
) -> None:
"""
Args:
genotypes (GRG | MutableGRG, optional):
A Genotype Representation Graph containing genotype data for each sample.
filename (str, optional)
File storing the GRG.
"""
self.__genotypes = genotypes
self.__filename = filename
self.__mutable = mutable
self.__latest = False
def __getitem__(self, key: str) -> Any:
"""
To access an attribute of the class using the square bracket notation,
similar to a dictionary.
"""
try:
return getattr(self, key)
except AttributeError:
raise KeyError(f'Invalid key: {key}.')
def __setitem__(self, key: str, value: Any):
"""
To set an attribute of the class using the square bracket notation,
similar to a dictionary.
"""
try:
setattr(self, key, value)
except AttributeError:
raise KeyError(f'Invalid key: {key}.')
def __repr__(self) -> str:
return format_repr(
self,
shape=self.shape,
n_snps=self._grg_count("num_mutations"),
n_haplotypes=self._grg_count("num_samples"),
n_samples=self._n_samples_or_none(),
filename=self.__filename,
mutable=self.__mutable,
loaded=self.__genotypes is not None,
)
def __str__(self) -> str:
return self.__repr__()
def _grg_count(self, attr: str) -> Optional[int]:
if self.__genotypes is None:
return None
value = getattr(self.__genotypes, attr, None)
if value is None:
return None
return int(value)
def _n_samples_or_none(self) -> Optional[int]:
try:
return self.n_samples()
except Exception:
return None
@property
def genotypes(self) -> Optional[GRGType]:
"""
Retrieve `genotypes`.
Returns:
GRG | MutableGRG:
An GRG containing genotype data for all samples.
"""
return self.__genotypes
@genotypes.setter
def genotypes(self, x: GRGType):
"""
Update `genotypes`.
"""
self.__genotypes = x
@property
def filename(self) -> str:
"""
Retrieve `filename`.
Returns:
str
A string containing the file name.
"""
return self.__filename
@filename.setter
def filename(self, x: str):
"""
Update `filename`.
"""
self.__filename = x
@property
def mutable(self) -> Optional[bool]:
return self.__mutable
@property
def shape(self) -> Tuple[Optional[int], Optional[int]]:
"""
Retrieve the graph genotype shape as `(n_mutations, n_haplotypes)`.
"""
return (self._grg_count("num_mutations"), self._grg_count("num_samples"))
def allele_freq(self) -> np.ndarray:
# allele frequency array
al_freq = np.ones(self.genotypes.num_samples) / self.genotypes.num_samples
return pyg.dot_product(self.genotypes, al_freq, pyg.TraversalDirection.UP)
def dot_product(self, array: np.ndarray, traversal_direction: pyg.TraversalDirection):
return pyg.dot_product(self.genotypes, array, traversal_direction)
# TODO: consider moving this elsewhere.
def allele_freq_from_file(self, filename: Optional[str] = None) -> pd.DataFrame:
newfile = filename if filename is not None else self.__filename
if newfile is None:
raise ValueError("Either pass in a filename, or store an existing GRG filename.")
with tempfile.NamedTemporaryFile() as fp:
subprocess.run(["grg", "process", "freq", f"{newfile}"], stdout=fp, check=True)
fp.seek(0) # set the file cursor
return pd.read_csv(fp.name, sep="\t")
def gwas(self, phenotype_file: str, filename: Optional[str] = None) -> pd.DataFrame:
grg_file = filename if filename is not None else self.__filename
if grg_file is None:
raise ValueError("Either pass in a GRG filename, or store an existing GRG filename.")
with tempfile.NamedTemporaryFile(suffix=".tsv") as fp:
try:
subprocess.run(
["grapp", "assoc", "-p", f"{phenotype_file}", "-o", fp.name, f"{grg_file}"],
check=True,
)
except FileNotFoundError as exc:
raise ImportError(
"GWAS support requires the optional dependency 'grapp'. "
"Install it with: pip install grapp"
) from exc
return pd.read_csv(fp.name, sep="\t")
def merge(self, combine_nodes: bool = False, *args) -> None:
# assert self.__mutable and isinstance(self.genotypes, pyg.MutableGRG), "GRG must be mutable"
for arg in args:
if not isinstance(arg, str):
raise TypeError("All merge inputs must be strings.")
# list of files, and combine_nodes
self.__genotypes.merge(list(args), combine_nodes)
#pep8 be damned
# if inplace: self.__genotypes = merged_data
# else : return merged_data
[docs]
def n_samples(self, ploidy = 2) -> int:
"""
Get number of samples from GRG. Diploid by default.
"""
return int(self.__genotypes.num_samples / ploidy)
def n_snps(self) -> int:
return self.__genotypes.num_mutations
def _sample_ids(self, n_samples: int, sample_prefix: str) -> np.ndarray:
default_ids = [f"{sample_prefix}_{idx}" for idx in range(n_samples)]
if self.__genotypes is None:
return np.asarray(default_ids, dtype=object)
has_individual_ids = bool(getattr(self.__genotypes, "has_individual_ids", False))
num_individuals = int(getattr(self.__genotypes, "num_individuals", 0))
if has_individual_ids and n_samples == num_individuals:
ids = []
for idx in range(n_samples):
try:
sample_id = str(self.__genotypes.get_individual_id(idx))
except RuntimeError:
sample_id = ""
ids.append(sample_id if sample_id else default_ids[idx])
else:
ids = default_ids
# Keep IDs unique for downstream writers.
seen = {}
unique_ids = []
for idx, sample_id in enumerate(ids):
count = seen.get(sample_id, 0)
unique_ids.append(sample_id if count == 0 else f"{sample_id}_{count}")
seen[sample_id] = count + 1
return np.asarray(unique_ids, dtype=object)
[docs]
def to_snpobject(
self,
sum_strands: bool = False,
chrom: str = ".",
sample_prefix: str = "sample",
):
"""
Convert the GRG to a dense SNPObject.
Notes:
- This materializes the full genotype matrix, so memory usage scales with
`num_mutations * num_samples`.
- For diploid GRGs and `sum_strands=False`, output has shape
`(n_snps, n_samples, 2)`.
- For `sum_strands=True`, output has shape `(n_snps, n_samples)` with
per-individual allele counts.
"""
from snputils.snp.genobj.snpobj import SNPObject
if self.__genotypes is None:
raise ValueError("Cannot convert to SNPObject: `genotypes` is None.")
grg = self.__genotypes
n_mutations = int(grg.num_mutations)
n_haplotypes = int(grg.num_samples)
ploidy = int(getattr(grg, "ploidy", 2))
if ploidy <= 0:
raise ValueError(f"Invalid ploidy in GRG: {ploidy}")
if n_haplotypes % ploidy != 0:
raise ValueError(
f"GRG has {n_haplotypes} haplotypes, not divisible by ploidy {ploidy}."
)
n_individuals = n_haplotypes // ploidy
chrom = str(chrom)
def _empty(shape):
return np.empty(shape, dtype=np.int8)
if sum_strands:
if n_mutations == 0:
genotypes = _empty((0, n_individuals))
elif ploidy == 1:
mutation_eye = np.eye(n_mutations, dtype=np.float64)
hap_matrix = pyg.matmul(grg, mutation_eye, pyg.TraversalDirection.DOWN)
genotypes = np.rint(hap_matrix).astype(np.int8, copy=False)
else:
mutation_eye = np.eye(n_mutations, dtype=np.float64)
diploid_matrix = pyg.matmul(
grg, mutation_eye, pyg.TraversalDirection.DOWN, by_individual=True
)
genotypes = np.rint(diploid_matrix).astype(np.int8, copy=False)
sample_ids = self._sample_ids(n_individuals, sample_prefix)
else:
if ploidy != 2:
raise ValueError(
"Phased SNPObject output requires diploid GRGs. "
"Use `sum_strands=True` for non-diploid data."
)
if n_mutations == 0:
genotypes = _empty((0, n_individuals, ploidy))
else:
mutation_eye = np.eye(n_mutations, dtype=np.float64)
hap_matrix = pyg.matmul(grg, mutation_eye, pyg.TraversalDirection.DOWN)
hap_matrix = np.rint(hap_matrix).astype(np.int8, copy=False)
genotypes = hap_matrix.reshape(n_mutations, n_individuals, ploidy)
sample_ids = self._sample_ids(n_individuals, sample_prefix)
variants_ref = np.empty(n_mutations, dtype=object)
variants_alt = np.empty(n_mutations, dtype=object)
variants_pos = np.empty(n_mutations, dtype=np.int64)
variants_id = np.empty(n_mutations, dtype=object)
for mut_id in range(n_mutations):
mutation = grg.get_mutation_by_id(mut_id)
position = int(round(float(mutation.position)))
ref = str(mutation.ref_allele) if str(mutation.ref_allele) else "."
alt = str(mutation.allele) if str(mutation.allele) else "."
variants_pos[mut_id] = position
variants_ref[mut_id] = ref
variants_alt[mut_id] = alt
variants_id[mut_id] = f"{chrom}:{position}"
variants_chrom = np.full(n_mutations, chrom, dtype=object)
variants_filter_pass = np.full(n_mutations, "PASS", dtype=object)
variants_qual = np.full(n_mutations, np.nan, dtype=np.float32)
return SNPObject(
genotypes=genotypes,
samples=sample_ids,
variants_ref=variants_ref,
variants_alt=variants_alt,
variants_chrom=variants_chrom,
variants_filter_pass=variants_filter_pass,
variants_id=variants_id,
variants_pos=variants_pos,
variants_qual=variants_qual,
)
[docs]
def copy(self) -> GRGObject:
"""
Create and return a copy of `self`.
Returns:
GRGObject:
A new instance of the current object.
"""
return copy.deepcopy(self)
[docs]
def keys(self) -> List[str]:
"""
Retrieve a list of public attribute names for `self`.
Returns:
list of str:
A list of attribute names, with internal name-mangling removed,
for easier reference to public attributes in the instance.
"""
return [attr.replace('_GRGObject__', '') for attr in vars(self)]
def to_grg(self, filename: str,
allow_simplify: bool = True):
pyg.save_grg(self.__genotypes, filename, allow_simplify)
[docs]
def save(self, filename: str, allow_simplify: bool = True):
"""Write the GRG to disk.
This is the object-style alias for :meth:`to_grg`, matching the save
method exposed by other snputils containers.
"""
self.to_grg(filename, allow_simplify=allow_simplify)