import logging
import pandas as pd
import numpy as np
from pathlib import Path
from typing import Optional, Sequence, Union
from snputils.snp.genobj import SNPObject
from snputils.snp.io.write._plink import coerce_sex_codes
log = logging.getLogger(__name__)
[docs]
class BEDWriter:
"""Writes an object in bed/bim/fam formats in the specified output path.
Args:
snpobj: The SNPObject to be written.
file: The output file path.
"""
def __init__(self, snpobj: SNPObject, filename: str):
self.__snpobj = snpobj.copy()
self.__filename = Path(filename)
[docs]
def write(
self,
rename_missing_values: bool = True,
before: Union[int, float, str] = -1,
after: Union[int, float, str] = '.',
sample_phenotype: Optional[Union[np.ndarray, Sequence[Union[str, int, float]], str, int, float]] = None,
):
"""
Writes the SNPObject to bed/bim/fam formats.
Args:
rename_missing_values (bool, optional):
If True, renames potential missing values in `snpobj.calldata_gt` before writing.
Defaults to True.
before (int, float, or str, default=-1):
The current representation of missing values in `calldata_gt`. Common values might be -1, '.', or NaN.
Default is -1.
after (int, float, or str, default='.'):
The value that will replace `before`. Default is '.'.
sample_phenotype (optional): PLINK phenotype value per sample, or a scalar used for all samples.
Defaults to ``-9`` for all samples.
"""
# Save .bed file
if self.__filename.suffix != '.bed':
self.__filename = self.__filename.with_suffix('.bed')
log.info(f"Writing .bed file: {self.__filename}")
# Optionally rename potential missing values in `snpobj.calldata_gt` before writing
if rename_missing_values:
self.__snpobj.rename_missings(before=before, after=after, inplace=True)
# PLINK BED stores variants-major packed 2-bit hard calls. Convert to an
# individuals x variants dosage matrix first, then pack each variant row.
if len(self.__snpobj.calldata_gt.shape) == 3:
genotype_matrix = self.__snpobj.calldata_gt.transpose(1, 0, 2).sum(axis=2, dtype=np.int8)
elif len(self.__snpobj.calldata_gt.shape) == 2:
genotype_matrix = self.__snpobj.calldata_gt.T
else:
raise ValueError("`calldata_gt` must be a 2D or 3D array.")
genotype_matrix = np.asarray(genotype_matrix)
if not np.issubdtype(genotype_matrix.dtype, np.number):
genotype_matrix = np.where(genotype_matrix == ".", -9, genotype_matrix)
genotype_matrix = genotype_matrix.astype(np.int8, copy=False)
genotype_matrix = np.where(genotype_matrix < 0, -9, genotype_matrix)
# Infer the number of samples and variants from the matrix
samples, variants = genotype_matrix.shape
with self.__filename.open("wb") as handle:
handle.write(bytes([0x6C, 0x1B, 0x01]))
for snp_i in range(variants):
for start in range(0, samples, 4):
byte = 0
for offset in range(4):
sample_i = start + offset
if sample_i >= samples:
code = 0b00
else:
dosage = int(genotype_matrix[sample_i, snp_i])
if dosage == -9:
code = 0b01
elif dosage == 0:
code = 0b11
elif dosage == 1:
code = 0b10
elif dosage == 2:
code = 0b00
else:
raise ValueError(f"Unexpected diploid dosage {dosage}")
byte |= code << (2 * offset)
handle.write(bytes([byte]))
log.info(f"Finished writing .bed file: {self.__filename}")
# Remove .bed from the file name
if self.__filename.suffix == '.bed':
self.__filename = self.__filename.with_suffix('')
# Save .fam file
log.info(f"Writing .fam file: {self.__filename}")
# Fill .fam file
fam_file = pd.DataFrame(columns=['fid', 'iid', 'father', 'mother', 'gender', 'trait'])
fam_file['iid'] = self.__snpobj.samples
fid = getattr(self.__snpobj, "sample_fid", None)
if fid is None:
fam_file['fid'] = self.__snpobj.samples
else:
fid_arr = np.asarray(fid)
if fid_arr.shape[0] != len(self.__snpobj.samples):
raise ValueError(
f"snpobj.sample_fid length ({fid_arr.shape[0]}) must match number of samples "
f"({len(self.__snpobj.samples)})."
)
fam_file['fid'] = fid_arr
fam_file['father'] = 0
fam_file['mother'] = 0
sample_sex = getattr(self.__snpobj, "sample_sex", None)
fam_file['gender'] = coerce_sex_codes(sample_sex, len(self.__snpobj.samples), missing_code="0")
fam_file['trait'] = self._coerce_phenotypes(sample_phenotype, len(self.__snpobj.samples))
# Save .fam file
fam_file.to_csv(self.__filename.with_suffix('.fam'), sep='\t', index=False, header=False)
log.info(f"Finished writing .fam file: {self.__filename}")
# Save .bim file
log.info(f"Writing .bim file: {self.__filename}")
# Fill .bim file
bim_file = pd.DataFrame(columns=['chrom', 'snp', 'cm', 'pos', 'a0', 'a1'])
bim_file['chrom'] = self.__snpobj.variants_chrom
bim_file['snp'] = self.__snpobj.variants_id
bim_file['cm'] = self._coerce_centimorgans(self.__snpobj.variants_cm, self.__snpobj.n_snps)
bim_file['pos'] = self.__snpobj.variants_pos
bim_file['a0'] = self.__snpobj.variants_alt
bim_file['a1'] = self.__snpobj.variants_ref
# Save .bim file
bim_file.to_csv(self.__filename.with_suffix('.bim'), sep='\t', index=False, header=False)
log.info(f"Finished writing .bim file: {self.__filename}")
@staticmethod
def _coerce_centimorgans(
variants_cm: Optional[Union[np.ndarray, Sequence[Union[str, int, float]]]],
n_variants: int,
) -> np.ndarray:
if variants_cm is None:
return np.zeros(n_variants)
cm = np.asarray(variants_cm)
if cm.shape[0] != n_variants:
raise ValueError(f"variants_cm length ({cm.shape[0]}) must match number of variants ({n_variants}).")
return cm
@staticmethod
def _coerce_phenotypes(
sample_phenotype: Optional[Union[np.ndarray, Sequence[Union[str, int, float]], str, int, float]],
n_samples: int,
) -> np.ndarray:
if sample_phenotype is None:
return np.repeat("-9", n_samples)
phenotype = np.asarray(sample_phenotype)
if phenotype.ndim == 0:
return np.repeat(str(phenotype.item()), n_samples)
if phenotype.shape[0] != n_samples:
raise ValueError(
f"sample_phenotype length ({phenotype.shape[0]}) must match number of samples ({n_samples})."
)
return phenotype.astype(str)