Source code for snputils.snp.io.write.vcf

import logging
import joblib
from pathlib import Path
from typing import Optional, Sequence, Union

import numpy as np

from snputils.snp.genobj import SNPObject

log = logging.getLogger(__name__)


[docs] class VCFWriter: """ A writer class for exporting SNP data from a `snputils.snp.genobj.SNPObject` into an `.vcf` file. """ def __init__(self, snpobj: SNPObject, filename: str, n_jobs: int = -1, phased: bool = False): """ Args: snpobj (SNPObject): A SNPObject instance. file (str or pathlib.Path): Path to the file where the data will be saved. It should end with `.vcf`. If the provided path does not have this extension, the `.vcf` extension will be appended. n_jobs: Number of jobs to run in parallel. - `None`: use 1 job unless within a `joblib.parallel_backend` context. - `-1`: use all available processors. - Any other integer: use the specified number of jobs. phased: If True, genotype data is written in "maternal|paternal" format. If False, genotype data is written in "maternal/paternal" format. """ self.__snpobj = snpobj self.__filename = Path(filename) self.__n_jobs = n_jobs self.__phased = phased
[docs] def write( self, chrom_partition: bool = False, rename_missing_values: bool = True, before: Union[int, float, str] = -1, after: Union[int, float, str] = '.', variants_info: Optional[Sequence[str]] = None, ): """ Writes the SNP data to VCF file(s). Args: chrom_partition (bool, optional): If True, individual VCF files are generated for each chromosome. If False, a single VCF file containing data for all chromosomes is created. Defaults to False. rename_missing_values (bool, optional): If True, renames potential missing values in `snpobj.calldata_gt` before writing. Defaults to True. before (int, float, or str, default=-1): The current representation of missing values in `calldata_gt`. Common values might be -1, '.', or NaN. Default is -1. after (int, float, or str, default='.'): The value that will replace `before`. Default is '.'. variants_info (sequence of str, optional): Per-variant INFO column values (e.g. ``["END=2000", "END=3000"]``). Length must match variant count. When provided, a ##INFO header line for END is written if any value contains ``END=``. """ self.__chrom_partition = chrom_partition file_extensions = (".vcf", ".bcf") if self.__filename.suffix in file_extensions: self.__file_extension = self.__filename.suffix self.__filename = self.__filename.with_suffix('') else: self.__file_extension = ".vcf" # Optionally rename potential missing values in `snpobj.calldata_gt` before writing if rename_missing_values: self.__snpobj.rename_missings(before=before, after=after, inplace=True) data = self.__snpobj if self.__chrom_partition: chroms = data.unique_chrom for chrom in chroms: data_chrom = data.filter_variants(chrom=chrom, inplace=False) if variants_info is not None: mask = data.variants_chrom == chrom info_chrom = [variants_info[i] for i in np.where(mask)[0]] else: info_chrom = None log.debug(f'Storing chromosome {chrom}') self._write_chromosome_data(chrom, data_chrom, info_chrom) else: self._write_chromosome_data("All", data, variants_info)
def _write_chromosome_data( self, chrom, data_chrom, variants_info: Optional[Sequence[str]] = None ): """ Writes the SNP data for a specific chromosome to a VCF file. Args: chrom: The chromosome name. data_chrom: The SNPObject instance containing the data for the chromosome. variants_info: Optional per-variant INFO strings; length must match variant count. """ npy3 = data_chrom.calldata_gt n_windows, n_samples, _ = npy3.shape if chrom == "All": file = self.__filename.with_suffix(self.__file_extension) else: file = self.__filename.parent / f"{self.__filename.stem}_{chrom}{self.__file_extension}" out = open(file, "w") out.write("##fileformat=VCFv4.1\n") out.write('##FORMAT=<ID=GT,Number=1,Type=String,Description="Phased Genotype">\n') if variants_info is not None and any("END=" in s for s in variants_info): out.write('##INFO=<ID=END,Number=1,Type=Integer,Description="End position of the segment">\n') for c in set(data_chrom.variants_chrom): out.write(f"##contig=<ID={c}>\n") cols = ["#CHROM","POS","ID","REF","ALT","QUAL","FILTER","INFO","FORMAT"] + list(data_chrom.samples) out.write("\t".join(cols) + "\n") sep = "|" if self.__phased else "/" for i in range(n_windows): chrom_val = data_chrom.variants_chrom[i] pos = data_chrom.variants_pos[i] vid = data_chrom.variants_id[i] ref = data_chrom.variants_ref[i] alt = data_chrom.variants_alt[i] info_str = variants_info[i] if variants_info is not None else "." row = npy3[i] genotypes = [ f"{row[s,0]}{sep}{row[s,1]}" for s in range(n_samples) ] line = "\t".join([ str(chrom_val), str(pos), vid, ref, alt, ".", "PASS", info_str, "GT", *genotypes ]) out.write(line + "\n") out.close()
def process_genotype(npy, i, n_snps, phased): """ Process the genotype data for a particular individual in "maternal|paternal" format for each SNP. Args: npy: Array containing genotype data for multiple individuals. i: Index of the individual to process. n_snps: Number of SNPs. Returns: **genotype**: List with "maternal|paternal" for each SNP. """ # Get the genotype data for the specified individual's maternal and paternal SNPs maternal = npy[i*2, :].astype(str) # maternal strand is the even row paternal = npy[i*2 + 1, :].astype(str) # paternal strand is the odd row # Create a list with "maternal|paternal" format for each SNP sep = "|" if phased else "/" lst = [maternal, [sep] * n_snps, paternal] genotype = list(map(''.join, zip(*lst))) return genotype