Source code for snputils.snp.io.write.vcf

import logging
import gzip
from pathlib import Path
from typing import Optional, Sequence, Union

import numpy as np

from snputils.snp.genobj import SNPObject

log = logging.getLogger(__name__)


def _normalize_info_values(
    data: SNPObject,
    variants_info: Optional[Sequence[str]],
) -> np.ndarray:
    if variants_info is not None:
        values = np.asarray(variants_info, dtype=object)
    elif data.variants_info is not None and len(np.asarray(data.variants_info)) == data.n_snps:
        values = np.asarray(data.variants_info, dtype=object)
    else:
        values = np.full(data.n_snps, ".", dtype=object)

    if values.shape[0] != data.n_snps:
        raise ValueError("variants_info length must match the number of variants.")

    return np.asarray(
        ["." if value is None or str(value) == "" else str(value) for value in values],
        dtype=object,
    )


def _normalize_qual_values(data: SNPObject) -> np.ndarray:
    values = data.variants_qual
    if values is None or len(np.asarray(values)) != data.n_snps:
        return np.full(data.n_snps, ".", dtype=object)
    out = []
    for value in np.asarray(values, dtype=object):
        if value is None:
            out.append(".")
        elif isinstance(value, (float, np.floating)) and np.isnan(value):
            out.append(".")
        else:
            text = str(value)
            out.append("." if text == "" else text)
    return np.asarray(out, dtype=object)


def _normalize_filter_values(data: SNPObject) -> np.ndarray:
    values = data.variants_filter_pass
    if values is None or len(np.asarray(values)) != data.n_snps:
        return np.full(data.n_snps, "PASS", dtype=object)

    arr = np.asarray(values)
    if arr.dtype == np.bool_:
        return np.where(arr, "PASS", ".").astype(object)

    out = []
    for value in np.asarray(values, dtype=object):
        if value is None:
            out.append(".")
        else:
            text = str(value)
            out.append("." if text == "" else text)
    return np.asarray(out, dtype=object)


[docs] class VCFWriter: """ A writer class for exporting SNP data from a `snputils.snp.genobj.SNPObject` into an `.vcf` file. Supports sampleless VCF writes (annotation-only) when the input `SNPObject` has no samples. """ def __init__(self, snpobj: SNPObject, filename: str, n_jobs: int = -1, phased: bool = False): """ Args: snpobj (SNPObject): A SNPObject instance. file (str or pathlib.Path): Path to the file where the data will be saved. It should end with `.vcf`. If the provided path does not have this extension, the `.vcf` extension will be appended. n_jobs: Number of jobs to run in parallel. - `None`: use 1 job unless within a `joblib.parallel_backend` context. - `-1`: use all available processors. - Any other integer: use the specified number of jobs. phased: If True, genotype data is written in "maternal|paternal" format. If False, genotype data is written in "maternal/paternal" format. """ del n_jobs self.__snpobj = snpobj self.__filename = Path(filename) self.__phased = phased
[docs] def write( self, chrom_partition: bool = False, rename_missing_values: bool = True, before: Union[int, float, str] = -1, after: Union[int, float, str] = '.', variants_info: Optional[Sequence[str]] = None, ): """ Writes the SNP data to VCF file(s). If writing a sampleless VCF, the genotypes array must be an empty array with a variant axis. Args: chrom_partition (bool, optional): If True, individual VCF files are generated for each chromosome. If False, a single VCF file containing data for all chromosomes is created. Defaults to False. rename_missing_values (bool, optional): If True, renames potential missing values in `snpobj.genotypes` before writing. Defaults to True. before (int, float, or str, default=-1): The current representation of missing values in `genotypes`. Common values might be -1, '.', or NaN. Default is -1. after (int, float, or str, default='.'): The value that will replace `before`. Default is '.'. variants_info (sequence of str, optional): Per-variant INFO column values (e.g. ``["END=2000", "END=3000"]``). Length must match variant count. When provided, a ##INFO header line for END is written if any value contains ``END=``. """ self.__chrom_partition = chrom_partition file_extensions = (".vcf", ".bcf") suffixes = self.__filename.suffixes if len(suffixes) >= 2 and suffixes[-2:] == [".vcf", ".gz"]: self.__file_extension = ".vcf.gz" self.__filename = self.__filename.with_suffix("").with_suffix("") elif self.__filename.suffix in file_extensions: self.__file_extension = self.__filename.suffix self.__filename = self.__filename.with_suffix('') else: self.__file_extension = ".vcf" # Optionally rename potential missing values in `snpobj.genotypes` before writing if rename_missing_values: self.__snpobj.rename_missings(before=before, after=after, inplace=True) data = self.__snpobj if self.__chrom_partition: chroms = data.unique_chrom for chrom in chroms: data_chrom = data.filter_variants(chrom=chrom, inplace=False) if variants_info is not None: mask = data.variants_chrom == chrom info_chrom = [variants_info[i] for i in np.where(mask)[0]] else: info_chrom = None log.debug(f'Storing chromosome {chrom}') self._write_chromosome_data(chrom, data_chrom, info_chrom) else: self._write_chromosome_data("All", data, variants_info)
def _write_chromosome_data( self, chrom, data_chrom, variants_info: Optional[Sequence[str]] = None ): """ Writes the SNP data for a specific chromosome to a VCF file. Args: chrom: The chromosome name. data_chrom: The SNPObject instance containing the data for the chromosome. variants_info: Optional per-variant INFO strings; length must match variant count. """ genotypes = np.asarray(data_chrom.genotypes) n_windows = data_chrom.n_snps n_samples = data_chrom.n_samples has_samples = n_samples > 0 if has_samples: if genotypes.ndim != 3 or genotypes.shape[2] != 2: raise ValueError("VCFWriter requires diploid genotype arrays with shape (n_variants, n_samples, 2).") elif genotypes.ndim not in (2, 3): raise ValueError("Sampleless VCF writes require an empty genotype array with a variant axis.") info_values = _normalize_info_values(data_chrom, variants_info) qual_values = _normalize_qual_values(data_chrom) filter_values = _normalize_filter_values(data_chrom) if chrom == "All": file = self.__filename.with_suffix(self.__file_extension) else: file = self.__filename.parent / f"{self.__filename.stem}_{chrom}{self.__file_extension}" if self.__file_extension == ".vcf.gz": out = gzip.open(file, "wt", encoding="utf-8") else: out = open(file, "w") out.write("##fileformat=VCFv4.1\n") if has_samples: out.write('##FORMAT=<ID=GT,Number=1,Type=String,Description="Phased Genotype">\n') if any("END=" in s for s in info_values): out.write('##INFO=<ID=END,Number=1,Type=Integer,Description="End position of the segment">\n') for c in set(data_chrom.variants_chrom): out.write(f"##contig=<ID={c}>\n") cols = ["#CHROM","POS","ID","REF","ALT","QUAL","FILTER","INFO"] if has_samples: cols += ["FORMAT"] + list(data_chrom.samples) out.write("\t".join(cols) + "\n") sep = "|" if self.__phased else "/" for i in range(n_windows): chrom_val = data_chrom.variants_chrom[i] pos = data_chrom.variants_pos[i] vid = data_chrom.variants_id[i] ref = data_chrom.variants_ref[i] alt = data_chrom.variants_alt[i] fields = [ str(chrom_val), str(pos), str(vid), str(ref), str(alt), str(qual_values[i]), str(filter_values[i]), str(info_values[i]), ] if has_samples: row = genotypes[i] sample_fields = [ f"{row[s,0]}{sep}{row[s,1]}" for s in range(n_samples) ] fields.extend(["GT", *sample_fields]) line = "\t".join(fields) out.write(line + "\n") out.close()