SNPObject Allele Frequency Demo

This notebook shows:

  • Cohort-level allele frequencies

  • Grouped allele frequencies (using sample labels)

  • Ancestry-specific allele frequencies (using SNP-level LAI)

  • Chunked/streaming allele frequencies via allele_freq_stream

  • Optional memory-efficient streaming with BED, PGEN, and VCF (and MSP for ancestry-specific)

import os
import sys
from pathlib import Path

import numpy as np

repo_root = os.path.abspath("..")
if repo_root not in sys.path:
    sys.path.append(repo_root)

from snputils.snp.genobj.snpobj import SNPObject
from snputils.snp.io.read import BEDReader, PGENReader
from snputils.snp.io.read.vcf import VCFReaderPolars
from snputils.stats import allele_freq_stream
from IPython.display import display

Synthetic Setup (7 SNPs x 5 Individuals)

calldata_gt is 3D here: (n_snps, n_samples, 2) with haplotype values in {0, 1} and missing as -1.

samples = np.array(["S1", "S2", "S3", "S4", "S5"])
labels = np.array(["POP_A", "POP_A", "POP_B", "POP_B", "POP_C"])

gt_3d = np.array(
    [
        [[0, 0], [0, 1], [1, 1], [0, 1], [1, 0]],
        [[0, 1], [1, 1], [0, 0], [1, 0], [0, 0]],
        [[1, 1], [1, 0], [0, 0], [0, 1], [1, 1]],
        [[0, 0], [0, 0], [0, 1], [1, 1], [-1, -1]],
        [[1, 0], [1, 0], [1, 1], [0, 0], [0, 1]],
        [[0, 1], [-1, -1], [0, 0], [1, 0], [1, 1]],
        [[1, 1], [0, 1], [0, 0], [0, 0], [0, 1]],
    ],
    dtype=float,
)

lai_3d = np.array(
    [
        [[0, 0], [0, 1], [1, 1], [1, 0], [0, 1]],
        [[0, 1], [1, 1], [0, 0], [1, 0], [0, 0]],
        [[1, 1], [1, 0], [0, 0], [0, 1], [1, 1]],
        [[0, 0], [0, 0], [1, 0], [1, 1], [0, 1]],
        [[1, 0], [1, 0], [1, 1], [0, 0], [0, 1]],
        [[0, 1], [1, 1], [0, 0], [1, 0], [1, 1]],
        [[1, 1], [0, 1], [0, 0], [0, 0], [0, 1]],
    ],
    dtype=int,
)

print("gt_3d shape:", gt_3d.shape)
print("lai_3d shape:", lai_3d.shape)
gt_3d shape: (7, 5, 2)
lai_3d shape: (7, 5, 2)

1) Cohort-Level Allele Frequency

Default behavior (sample_labels=None) computes one AF value per SNP across all samples.

snp = SNPObject(calldata_gt=gt_3d, samples=samples)
af, called = snp.allele_freq(return_counts=True)

print("Cohort AF:")
print(af)
print("Called haplotypes per SNP:")
print(called)
Cohort AF:
[0.5   0.4   0.6   0.375 0.5   0.5   0.4  ]
Called haplotypes per SNP:
[10 10 10  8 10  8 10]

2) Grouped Allele Frequencies

Pass sample_labels to compute per-group AFs.

af_by_group, called_by_group = snp.allele_freq(
    sample_labels=labels,
    return_counts=True,
    as_dataframe=True,
)

print("Grouped AF (DataFrame):")
display(af_by_group)
print("Grouped called haplotypes (DataFrame):")
display(called_by_group)
Grouped AF (DataFrame):
Grouped called haplotypes (DataFrame):
POP_A POP_B POP_C
0 0.25 0.75 0.5
1 0.75 0.25 0.0
2 0.75 0.25 1.0
3 0.00 0.75 NaN
4 0.50 0.50 0.5
5 0.50 0.25 1.0
6 0.75 0.00 0.5
POP_A POP_B POP_C
0 4 4 2
1 4 4 2
2 4 4 2
3 4 4 0
4 4 4 2
5 2 4 2
6 4 4 2

3) Ancestry-Specific Allele Frequencies

With ancestry=..., only haplotypes assigned to that ancestry are counted.

Here, ancestry code 1 is selected.

snp_lai = SNPObject(calldata_gt=gt_3d, calldata_lai=lai_3d, samples=samples)

af_anc, called_anc = snp_lai.allele_freq(ancestry=1, return_counts=True)
print("Ancestry-specific cohort AF (ancestry=1):")
print(af_anc)
print("Ancestry-specific called haplotypes:")
print(called_anc)

af_anc_group, called_anc_group = snp_lai.allele_freq(
    sample_labels=labels,
    ancestry=1,
    return_counts=True,
    as_dataframe=True,
)
print("Ancestry-specific grouped AF (DataFrame):")
display(af_anc_group)
print("Ancestry-specific grouped called haplotypes (DataFrame):")
display(called_anc_group)
Ancestry-specific cohort AF (ancestry=1):
[0.6        1.         1.         0.66666667 1.         1.
 1.        ]
Ancestry-specific called haplotypes:
[5 4 6 3 5 4 4]
Ancestry-specific grouped AF (DataFrame):
Ancestry-specific grouped called haplotypes (DataFrame):
POP_A POP_B POP_C
0 1.0 0.666667 0.0
1 1.0 1.000000 NaN
2 1.0 1.000000 1.0
3 NaN 0.666667 NaN
4 1.0 1.000000 1.0
5 1.0 1.000000 1.0
6 1.0 NaN 1.0
POP_A POP_B POP_C
0 1 3 1
1 3 1 0
2 3 1 2
3 0 3 0
4 2 2 1
5 1 1 2
6 3 0 1

4) Chunked/Streaming AF from In-Memory SNPObject

allele_freq_stream(...) computes the same result in SNP chunks, which is useful for memory control on large datasets.

stream_af, stream_called = allele_freq_stream(
    snp_lai,
    chunk_size=3,
    sample_labels=labels,
    ancestry=1,
    return_counts=True,
)

eager_af, eager_called = snp_lai.allele_freq(
    sample_labels=labels,
    ancestry=1,
    return_counts=True,
)

np.testing.assert_allclose(stream_af, eager_af)
np.testing.assert_array_equal(stream_called, eager_called)
print("Chunked and eager ancestry-specific grouped AF match exactly.")
Chunked and eager ancestry-specific grouped AF match exactly.

5) Ancestry-Specific SNP Allele Frequencies with LAI Reader/Path

This example computes ancestry-specific SNP allele frequencies (not LAI frequencies).

  • SNP genotypes (calldata_gt) provide the allele values being averaged

  • LAI (laiobj) is used only as a mask to keep haplotypes with LAI == ancestry

laiobj can be provided as either:

  • MSPReader(...) (window-streaming LAI)

  • a path to an .msp file

Below, results are displayed by ancestry label (AFR, EUR) with called-allele counts.

from tempfile import TemporaryDirectory
from snputils.ancestry.io.local.read import MSPReader
import pandas as pd


def _write_toy_msp(path, sample_ids, chromosomes, starts, ends, lai, ancestry_map):
    haplotypes = [f"{sid}.{phase}" for sid in sample_ids for phase in (0, 1)]
    comment = "#Subpopulation order/codes: " + "\t".join(
        f"{label}={code}" for code, label in ancestry_map.items()
    )
    header = ["#chm", "spos", "epos", "sgpos", "egpos", "n snps"] + haplotypes

    with open(path, "w", encoding="utf-8") as handle:
        handle.write(comment + "\n")
        handle.write("\t".join(header) + "\n")
        for w in range(lai.shape[0]):
            prefix = [
                str(int(chromosomes[w])),
                str(int(starts[w])),
                str(int(ends[w])),
                str(int(starts[w])),
                str(int(ends[w])),
                "1",
            ]
            handle.write("\t".join(prefix) + "\t" + "\t".join(map(str, lai[w].tolist())) + "\n")


with TemporaryDirectory() as tmpdir:
    msp_path = Path(tmpdir) / "toy_stream.msp"

    # Two ancestry labels used in the LAI file
    ancestry_map = {0: "AFR", 1: "EUR"}

    sample_ids = ["S0", "S1"]
    chromosomes = np.array([1, 1], dtype=np.int64)
    starts = np.array([1, 101], dtype=np.int64)
    ends = np.array([100, 200], dtype=np.int64)
    lai_windows = np.array(
        [
            [1, 1, 0, 1],
            [0, 1, 1, 0],
        ],
        dtype=np.uint8,
    )
    _write_toy_msp(msp_path, sample_ids, chromosomes, starts, ends, lai_windows, ancestry_map)

    # 6 SNPs (sorted by chromosome/position), all on chr1
    pos_stream = np.array([10, 50, 90, 120, 150, 180], dtype=np.int64)
    chrom_stream = np.array(["1"] * pos_stream.size, dtype=object)

    # Shape: (n_snps=6, n_samples=2, n_haplotypes=2)
    gt_stream = np.array(
        [
            [[0, 1], [1, 1]],  # pos 10
            [[1, 0], [0, 1]],  # pos 50
            [[0, 0], [1, 1]],  # pos 90
            [[1, 1], [0, 0]],  # pos 120
            [[1, 0], [1, 1]],  # pos 150
            [[0, 1], [0, 0]],  # pos 180
        ],
        dtype=float,
    )

    snp_stream = SNPObject(
        calldata_gt=gt_stream,
        variants_chrom=chrom_stream,
        variants_pos=pos_stream,
    )

    frames = []
    for ancestry_code, ancestry_label in sorted(ancestry_map.items()):
        af_reader, called_reader = allele_freq_stream(
            snp_stream,
            ancestry=ancestry_code,
            laiobj=MSPReader(msp_path),
            chunk_size=2,
            lai_chunk_size=1,
            return_counts=True,
        )
        af_path, called_path = allele_freq_stream(
            snp_stream,
            ancestry=ancestry_code,
            laiobj=msp_path,
            chunk_size=2,
            lai_chunk_size=1,
            return_counts=True,
        )

        # Reader and path modes should match exactly
        np.testing.assert_allclose(af_reader, af_path)
        np.testing.assert_array_equal(called_reader, called_path)

        frames.append(
            pd.DataFrame(
                {
                    "chrom": chrom_stream,
                    "pos": pos_stream,
                    "ancestry_code": ancestry_code,
                    "ancestry_label": ancestry_label,
                    "allele_freq": af_reader,
                    "called_alleles": called_reader,
                }
            )
        )

    results = pd.concat(frames, ignore_index=True).sort_values(
        ["chrom", "pos", "ancestry_code"]
    ).reset_index(drop=True)

    display(results)
    print("Interpretation: each row is SNP AF after masking to the ancestry_label haplotypes only.")
chrom pos ancestry_code ancestry_label allele_freq called_alleles
0 1 10 0 AFR 1.000000 1
1 1 10 1 EUR 0.666667 3
2 1 50 0 AFR 0.000000 1
3 1 50 1 EUR 0.666667 3
4 1 90 0 AFR 1.000000 1
5 1 90 1 EUR 0.333333 3
6 1 120 0 AFR 0.500000 2
7 1 120 1 EUR 0.500000 2
8 1 150 0 AFR 1.000000 2
9 1 150 1 EUR 0.500000 2
10 1 180 0 AFR 0.000000 2
11 1 180 1 EUR 0.500000 2
Interpretation: each row is SNP AF after masking to the ancestry_label haplotypes only.

6) Optional Reader-Based Chunking (BED, PGEN, VCF/Polars)

If repository test data exists under ../data, this section demonstrates chunked AF directly from readers.

  • BED: BEDReader.iter_read(...)

  • PGEN: PGENReader.iter_read(...)

  • VCF: VCFReaderPolars.iter_read(...)

To keep runtime manageable, this demo limits to ~10,000 variants per format.

data_dir = Path(repo_root) / "data"
paths = {
    "BED": data_dir / "bed" / "subset.bed",
    "PGEN": data_dir / "pgen" / "subset.pgen",
    "VCF": data_dir / "vcf" / "subset.vcf",
}
max_variants = 10_000


def _write_vcf_head(source_path: Path, output_path: Path, n_variants: int) -> None:
    written = 0
    with open(source_path, "r", encoding="utf-8") as src, open(output_path, "w", encoding="utf-8") as dst:
        for line in src:
            dst.write(line)
            if line.startswith("#"):
                continue
            written += 1
            if written >= int(n_variants):
                break


def _first_n_variant_idxs(reader, limit: int) -> np.ndarray:
    n_total = int(reader.read(fields=["ID"]).n_snps)
    return np.arange(min(limit, n_total), dtype=np.uint32)


if all(p.exists() for p in paths.values()):
    bed_reader = BEDReader(paths["BED"])
    pgen_reader = PGENReader(paths["PGEN"])

    bed_variant_idxs = _first_n_variant_idxs(bed_reader, max_variants)
    pgen_variant_idxs = _first_n_variant_idxs(pgen_reader, max_variants)

    mini_vcf_path = data_dir / "vcf" / f"subset_{max_variants}.vcf"
    if not mini_vcf_path.exists():
        _write_vcf_head(paths["VCF"], mini_vcf_path, max_variants)

    readers = {
        "BED": (bed_reader, {"variant_idxs": bed_variant_idxs}),
        "PGEN": (pgen_reader, {"variant_idxs": pgen_variant_idxs}),
        "VCF-Polars": (VCFReaderPolars(mini_vcf_path), {}),
    }

    for name, (reader, reader_kwargs) in readers.items():
        af_stream = allele_freq_stream(
            reader,
            chunk_size=250,
            sum_strands=False,
            **reader_kwargs,
        )
        af_eager = reader.read(sum_strands=False, **reader_kwargs).allele_freq()
        max_abs_diff = float(np.max(np.abs(af_stream - af_eager)))
        print(f"{name}: n_snps={af_stream.shape[0]}, max|stream-eager|={max_abs_diff:.3e}")
else:
    print("Optional reader demo skipped: expected files not found under ../data")
BED: n_snps=10000, max|stream-eager|=0.000e+00
PGEN: n_snps=10000, max|stream-eager|=0.000e+00
VCF-Polars: n_snps=10000, max|stream-eager|=0.000e+00

Notes

  • Missing values (-1 or NaN) are ignored in AF denominators.

  • return_counts=True returns called haplotype counts used for each estimate.

  • as_dataframe=True is useful for grouped output readability.

  • For very large datasets, prefer reader-based streaming (iter_read) + allele_freq_stream to keep memory bounded by chunk size.