Source code for snputils.snp.io.write.bcf

import logging
import gzip
import struct
import re
from pathlib import Path
from typing import Optional, Sequence, Union

import numpy as np

from snputils.snp.genobj import SNPObject

log = logging.getLogger(__name__)

def _encode_typed_string(s: str) -> bytes:
    if not s:
        return b"\x07"
    s_bytes = s.encode("utf-8")
    n = len(s_bytes)
    if n < 15:
        return bytes([(n << 4) | 7]) + s_bytes
    if n <= 127:
        len_enc = b"\x11" + bytes([n])
    elif n <= 32767:
        len_enc = b"\x12" + struct.pack("<h", n)
    else:
        len_enc = b"\x13" + struct.pack("<i", n)
    return b"\xf7" + len_enc + s_bytes

def _encode_typed_int_list(ints: list[int]) -> bytes:
    n = len(ints)
    if n == 0:
        return b"\x01"
    max_val = max(ints)
    min_val = min(ints)
    if min_val >= -120 and max_val <= 127:
        type_code = 1
        val_bytes = bytes([x & 0xFF for x in ints])
    elif min_val >= -32760 and max_val <= 32767:
        type_code = 2
        val_bytes = b"".join(struct.pack("<h", x) for x in ints)
    else:
        type_code = 3
        val_bytes = b"".join(struct.pack("<i", x) for x in ints)

    if n < 15:
        return bytes([(n << 4) | type_code]) + val_bytes
    if n <= 127:
        len_enc = b"\x11" + bytes([n])
    elif n <= 32767:
        len_enc = b"\x12" + struct.pack("<h", n)
    else:
        len_enc = b"\x13" + struct.pack("<i", n)
    return bytes([(15 << 4) | type_code]) + len_enc + val_bytes

[docs] class BCFWriter: """ A writer class for exporting SNP data from a `snputils.snp.genobj.SNPObject` into a `.bcf` file. """ def __init__(self, snpobj: SNPObject, filename: str, n_jobs: int = -1, phased: bool = False): """ Args: snpobj (SNPObject): A SNPObject instance. filename (str or pathlib.Path): Path to the file where the data will be saved. It should end with `.bcf`. If the provided path does not have this extension, the `.bcf` extension will be appended. n_jobs: Number of jobs to run in parallel. Unused, included for API consistency. phased: If True, genotype data is written in phased format. If False, genotype data is written in unphased format. """ del n_jobs self.__snpobj = snpobj self.__filename = Path(filename) self.__phased = phased
[docs] def write( self, chrom_partition: bool = False, rename_missing_values: bool = True, before: Union[int, float, str] = -1, after: Union[int, float, str] = '.', variants_info: Optional[Sequence[str]] = None, ): """ Writes the SNP data to BCF file(s). Args: chrom_partition (bool, optional): If True, individual BCF files are generated for each chromosome. If False, a single BCF file containing data for all chromosomes is created. Defaults to False. rename_missing_values (bool, optional): If True, renames potential missing values in `snpobj.genotypes` before writing. Defaults to True. before (int, float, or str, default=-1): The current representation of missing values in `genotypes`. Common values might be -1, '.', or NaN. Default is -1. after (int, float, or str, default='.'): The value that will replace `before`. Default is '.'. variants_info (sequence of str, optional): Per-variant INFO column values (e.g. ``["END=2000", "END=3000"]``). Length must match variant count. When provided, a ##INFO header line for END is written if any value contains ``END=``. """ self.__chrom_partition = chrom_partition if self.__filename.suffix != ".bcf": self.__filename = self.__filename.with_suffix(".bcf") # BCF binary format requires integer genotypes (where missing is -1), # so we do not rename missing values to string representations. pass data = self.__snpobj if self.__chrom_partition: chroms = data.unique_chrom for chrom in chroms: data_chrom = data.filter_variants(chrom=chrom, inplace=False) if variants_info is not None: mask = data.variants_chrom == chrom info_chrom = [variants_info[i] for i in np.where(mask)[0]] else: info_chrom = None log.debug(f'Storing chromosome {chrom}') self._write_chromosome_data(chrom, data_chrom, info_chrom) else: self._write_chromosome_data("All", data, variants_info)
def _write_chromosome_data( self, chrom: str, data_chrom: SNPObject, variants_info: Optional[Sequence[str]] = None ): n_records = data_chrom.genotypes.shape[0] if data_chrom.genotypes is not None else len(data_chrom.variants_pos) n_samples = len(data_chrom.samples) if data_chrom.samples is not None else 0 if chrom == "All": file = self.__filename else: file = self.__filename.parent / f"{self.__filename.stem}_{chrom}.bcf" # Unique chromosomes/contigs list unique_chroms = [] seen = set() for c in data_chrom.variants_chrom: if c not in seen: seen.add(c) unique_chroms.append(c) # Build header lines header_lines = [ "##fileformat=VCFv4.3", '##FILTER=<ID=PASS,Description="All filters passed",IDX=0>', '##FORMAT=<ID=GT,Number=1,Type=String,Description="Phased Genotype",IDX=1>' ] if variants_info is not None and any("END=" in s for s in variants_info): header_lines.append('##INFO=<ID=END,Number=1,Type=Integer,Description="End position of the segment",IDX=2>') for c in unique_chroms: header_lines.append(f"##contig=<ID={c}>") cols = ["#CHROM", "POS", "ID", "REF", "ALT", "QUAL", "FILTER", "INFO", "FORMAT"] + [str(sample) for sample in data_chrom.samples] header_lines.append("\t".join(cols)) header_text = "\n".join(header_lines) + "\n" header_bytes = header_text.encode("utf-8") + b"\0" header_len = len(header_bytes) with gzip.open(file, "wb") as out: # 1. Magic out.write(b"BCF\x02\x02") # 2. Header length out.write(struct.pack("<I", header_len)) # 3. Header text out.write(header_bytes) # 4. Records for i in range(n_records): chrom_val = data_chrom.variants_chrom[i] chrom_id = unique_chroms.index(chrom_val) pos = int(data_chrom.variants_pos[i]) - 1 ref = data_chrom.variants_ref[i] rlen = len(ref) if data_chrom.variants_qual is not None and not np.isnan(data_chrom.variants_qual[i]): qual_val = float(data_chrom.variants_qual[i]) else: # Reinterpret 0x7F800001 (BCF missing float sentinel bits) as a float qual_val = struct.unpack("<f", b"\x01\x00\x80\x7f")[0] alt_str = data_chrom.variants_alt[i] if not alt_str or alt_str == ".": alts = [] else: alts = alt_str.split(",") n_alleles = 1 + len(alts) # Parse END info n_info = 0 info_bytes = b"" if variants_info is not None: info_str = variants_info[i] if info_str and "END=" in info_str: match = re.search(r"END=(\d+)", info_str) if match: end_val = int(match.group(1)) n_info = 1 # INFO key: END (IDX = 2) key_bytes = _encode_typed_int_list([2]) val_bytes = _encode_typed_int_list([end_val]) info_bytes = key_bytes + val_bytes # Genotypes / FORMAT if data_chrom.genotypes is not None: n_fmt = 1 gt_array = np.zeros((n_samples, 2), dtype=np.uint8) row = data_chrom.genotypes[i] missing = row < 0 gt_array[~missing] = (row[~missing] + 1) << 1 if self.__phased: gt_array[~missing[:, 1], 1] |= 1 # FORMAT key index: GT (IDX = 1) fmt_key_bytes = _encode_typed_int_list([1]) # FORMAT value descriptor: (2 values per sample, type 1 int8) fmt_desc_bytes = b"\x21" indiv_bytes = fmt_key_bytes + fmt_desc_bytes + gt_array.tobytes() else: n_fmt = 0 indiv_bytes = b"" # Shared section fields id_bytes = _encode_typed_string(data_chrom.variants_id[i] if data_chrom.variants_id[i] != "." else "") ref_bytes = _encode_typed_string(ref) alt_bytes = b"".join(_encode_typed_string(alt) for alt in alts) filter_bytes = _encode_typed_int_list([0]) # PASS (IDX = 0) shared_bytes = struct.pack( "<iiIfII", chrom_id, pos, rlen, qual_val, (n_alleles << 16) | n_info, (n_fmt << 24) | n_samples ) + id_bytes + ref_bytes + alt_bytes + filter_bytes + info_bytes l_shared = len(shared_bytes) l_indiv = len(indiv_bytes) out.write(struct.pack("<II", l_shared, l_indiv)) out.write(shared_bytes) out.write(indiv_bytes)