import logging
import pandas as pd
import numpy as np
from pathlib import Path
from typing import Optional, Sequence, Union
from snputils.snp.genobj import SNPObject
from snputils.snp.io.write._genotype_encoding import phased_to_hardcalls
from snputils.snp.io.write._plink import coerce_sex_codes
log = logging.getLogger(__name__)
[docs]
class BEDWriter:
"""Writes an object in bed/bim/fam formats in the specified output path.
Args:
snpobj: The SNPObject to be written.
file: The output file path.
"""
def __init__(self, snpobj: SNPObject, filename: str):
self.__snpobj = snpobj.copy()
self.__filename = Path(filename)
[docs]
def write(
self,
rename_missing_values: bool = True,
before: Union[int, float, str] = -1,
after: Union[int, float, str] = '.',
sample_phenotype: Optional[Union[np.ndarray, Sequence[Union[str, int, float]], str, int, float]] = None,
):
"""
Writes the SNPObject to bed/bim/fam formats.
Args:
rename_missing_values (bool, optional):
If True, renames potential missing values in `snpobj.genotypes` before writing.
Defaults to True.
before (int, float, or str, default=-1):
The current representation of missing values in `genotypes`. Common values might be -1, '.', or NaN.
Default is -1.
after (int, float, or str, default='.'):
The value that will replace `before`. Default is '.'.
sample_phenotype (optional): PLINK phenotype value per sample, or a scalar used for all samples.
Defaults to ``-9`` for all samples.
"""
# Save .bed file
if self.__filename.suffix != '.bed':
self.__filename = self.__filename.with_suffix('.bed')
log.info(f"Writing .bed file: {self.__filename}")
# pgenlib expects a numeric hardcall matrix in (samples, variants) order.
hardcalls = phased_to_hardcalls(
self.__snpobj.genotypes,
rename_missing_values=rename_missing_values,
before=before,
after=after,
)
genotype_matrix = np.asarray(hardcalls.T)
# Infer the number of samples and variants from the matrix
samples, variants = genotype_matrix.shape
with self.__filename.open("wb") as handle:
handle.write(bytes([0x6C, 0x1B, 0x01]))
for snp_i in range(variants):
for start in range(0, samples, 4):
byte = 0
for offset in range(4):
sample_i = start + offset
if sample_i >= samples:
code = 0b00
else:
dosage = int(genotype_matrix[sample_i, snp_i])
if dosage == -9:
code = 0b01
elif dosage == 0:
code = 0b11
elif dosage == 1:
code = 0b10
elif dosage == 2:
code = 0b00
else:
raise ValueError(f"Unexpected diploid dosage {dosage}")
byte |= code << (2 * offset)
handle.write(bytes([byte]))
log.info(f"Finished writing .bed file: {self.__filename}")
# Remove .bed from the file name
if self.__filename.suffix == '.bed':
self.__filename = self.__filename.with_suffix('')
# Save .fam file
log.info(f"Writing .fam file: {self.__filename}")
# Fill .fam file
fam_file = pd.DataFrame(columns=['fid', 'iid', 'father', 'mother', 'gender', 'trait'])
fam_file['iid'] = self.__snpobj.samples
fid = getattr(self.__snpobj, "sample_fid", None)
if fid is None:
fam_file['fid'] = self.__snpobj.samples
else:
fid_arr = np.asarray(fid)
if fid_arr.shape[0] != len(self.__snpobj.samples):
raise ValueError(
f"snpobj.sample_fid length ({fid_arr.shape[0]}) must match number of samples "
f"({len(self.__snpobj.samples)})."
)
fam_file['fid'] = fid_arr
fam_file['father'] = 0
fam_file['mother'] = 0
sample_sex = getattr(self.__snpobj, "sample_sex", None)
fam_file['gender'] = coerce_sex_codes(sample_sex, len(self.__snpobj.samples), missing_code="0")
fam_file['trait'] = self._coerce_phenotypes(sample_phenotype, len(self.__snpobj.samples))
# Save .fam file
fam_file.to_csv(self.__filename.with_suffix('.fam'), sep='\t', index=False, header=False)
log.info(f"Finished writing .fam file: {self.__filename}")
# Save .bim file
log.info(f"Writing .bim file: {self.__filename}")
# Fill .bim file
bim_file = pd.DataFrame(columns=['chrom', 'snp', 'cm', 'pos', 'a0', 'a1'])
bim_file['chrom'] = self.__snpobj.variants_chrom
bim_file['snp'] = self.__snpobj.variants_id
bim_file['cm'] = self._coerce_centimorgans(self.__snpobj.variants_cm, self.__snpobj.n_snps)
bim_file['pos'] = self.__snpobj.variants_pos
bim_file['a0'] = self.__snpobj.variants_alt
bim_file['a1'] = self.__snpobj.variants_ref
# Save .bim file
bim_file.to_csv(self.__filename.with_suffix('.bim'), sep='\t', index=False, header=False)
log.info(f"Finished writing .bim file: {self.__filename}")
@staticmethod
def _coerce_centimorgans(
variants_cm: Optional[Union[np.ndarray, Sequence[Union[str, int, float]]]],
n_variants: int,
) -> np.ndarray:
if variants_cm is None:
return np.zeros(n_variants)
cm = np.asarray(variants_cm)
if cm.shape[0] != n_variants:
raise ValueError(f"variants_cm length ({cm.shape[0]}) must match number of variants ({n_variants}).")
return cm
@staticmethod
def _coerce_phenotypes(
sample_phenotype: Optional[Union[np.ndarray, Sequence[Union[str, int, float]], str, int, float]],
n_samples: int,
) -> np.ndarray:
if sample_phenotype is None:
return np.repeat("-9", n_samples)
phenotype = np.asarray(sample_phenotype)
if phenotype.ndim == 0:
return np.repeat(str(phenotype.item()), n_samples)
if phenotype.shape[0] != n_samples:
raise ValueError(
f"sample_phenotype length ({phenotype.shape[0]}) must match number of samples ({n_samples})."
)
return phenotype.astype(str)