SNPObject Allele Frequency Demo¶
This notebook shows:
Cohort-level allele frequencies
Grouped allele frequencies (using sample labels)
Ancestry-specific allele frequencies (using SNP-level LAI)
Chunked/streaming allele frequencies via
allele_freq_streamOptional memory-efficient streaming with BED, PGEN, and VCF (and MSP for ancestry-specific)
import os
import sys
from pathlib import Path
import numpy as np
repo_root = os.path.abspath("..")
if repo_root not in sys.path:
sys.path.append(repo_root)
from snputils.snp.genobj.snpobj import SNPObject
from snputils.snp.io.read import BEDReader, PGENReader
from snputils.snp.io.read.vcf import VCFReaderPolars
from snputils.stats import allele_freq_stream
from IPython.display import display
Synthetic Setup (7 SNPs x 5 Individuals)¶
calldata_gt is 3D here: (n_snps, n_samples, 2) with haplotype values in {0, 1} and missing as -1.
samples = np.array(["S1", "S2", "S3", "S4", "S5"])
labels = np.array(["POP_A", "POP_A", "POP_B", "POP_B", "POP_C"])
gt_3d = np.array(
[
[[0, 0], [0, 1], [1, 1], [0, 1], [1, 0]],
[[0, 1], [1, 1], [0, 0], [1, 0], [0, 0]],
[[1, 1], [1, 0], [0, 0], [0, 1], [1, 1]],
[[0, 0], [0, 0], [0, 1], [1, 1], [-1, -1]],
[[1, 0], [1, 0], [1, 1], [0, 0], [0, 1]],
[[0, 1], [-1, -1], [0, 0], [1, 0], [1, 1]],
[[1, 1], [0, 1], [0, 0], [0, 0], [0, 1]],
],
dtype=float,
)
lai_3d = np.array(
[
[[0, 0], [0, 1], [1, 1], [1, 0], [0, 1]],
[[0, 1], [1, 1], [0, 0], [1, 0], [0, 0]],
[[1, 1], [1, 0], [0, 0], [0, 1], [1, 1]],
[[0, 0], [0, 0], [1, 0], [1, 1], [0, 1]],
[[1, 0], [1, 0], [1, 1], [0, 0], [0, 1]],
[[0, 1], [1, 1], [0, 0], [1, 0], [1, 1]],
[[1, 1], [0, 1], [0, 0], [0, 0], [0, 1]],
],
dtype=int,
)
print("gt_3d shape:", gt_3d.shape)
print("lai_3d shape:", lai_3d.shape)
gt_3d shape: (7, 5, 2)
lai_3d shape: (7, 5, 2)
1) Cohort-Level Allele Frequency¶
Default behavior (sample_labels=None) computes one AF value per SNP across all samples.
snp = SNPObject(calldata_gt=gt_3d, samples=samples)
af, called = snp.allele_freq(return_counts=True)
print("Cohort AF:")
print(af)
print("Called haplotypes per SNP:")
print(called)
Cohort AF:
[0.5 0.4 0.6 0.375 0.5 0.5 0.4 ]
Called haplotypes per SNP:
[10 10 10 8 10 8 10]
2) Grouped Allele Frequencies¶
Pass sample_labels to compute per-group AFs.
af_by_group, called_by_group = snp.allele_freq(
sample_labels=labels,
return_counts=True,
as_dataframe=True,
)
print("Grouped AF (DataFrame):")
display(af_by_group)
print("Grouped called haplotypes (DataFrame):")
display(called_by_group)
Grouped AF (DataFrame):
Grouped called haplotypes (DataFrame):
| POP_A | POP_B | POP_C | |
|---|---|---|---|
| 0 | 0.25 | 0.75 | 0.5 |
| 1 | 0.75 | 0.25 | 0.0 |
| 2 | 0.75 | 0.25 | 1.0 |
| 3 | 0.00 | 0.75 | NaN |
| 4 | 0.50 | 0.50 | 0.5 |
| 5 | 0.50 | 0.25 | 1.0 |
| 6 | 0.75 | 0.00 | 0.5 |
| POP_A | POP_B | POP_C | |
|---|---|---|---|
| 0 | 4 | 4 | 2 |
| 1 | 4 | 4 | 2 |
| 2 | 4 | 4 | 2 |
| 3 | 4 | 4 | 0 |
| 4 | 4 | 4 | 2 |
| 5 | 2 | 4 | 2 |
| 6 | 4 | 4 | 2 |
3) Ancestry-Specific Allele Frequencies¶
With ancestry=..., only haplotypes assigned to that ancestry are counted.
Here, ancestry code 1 is selected.
snp_lai = SNPObject(calldata_gt=gt_3d, calldata_lai=lai_3d, samples=samples)
af_anc, called_anc = snp_lai.allele_freq(ancestry=1, return_counts=True)
print("Ancestry-specific cohort AF (ancestry=1):")
print(af_anc)
print("Ancestry-specific called haplotypes:")
print(called_anc)
af_anc_group, called_anc_group = snp_lai.allele_freq(
sample_labels=labels,
ancestry=1,
return_counts=True,
as_dataframe=True,
)
print("Ancestry-specific grouped AF (DataFrame):")
display(af_anc_group)
print("Ancestry-specific grouped called haplotypes (DataFrame):")
display(called_anc_group)
Ancestry-specific cohort AF (ancestry=1):
[0.6 1. 1. 0.66666667 1. 1.
1. ]
Ancestry-specific called haplotypes:
[5 4 6 3 5 4 4]
Ancestry-specific grouped AF (DataFrame):
Ancestry-specific grouped called haplotypes (DataFrame):
| POP_A | POP_B | POP_C | |
|---|---|---|---|
| 0 | 1.0 | 0.666667 | 0.0 |
| 1 | 1.0 | 1.000000 | NaN |
| 2 | 1.0 | 1.000000 | 1.0 |
| 3 | NaN | 0.666667 | NaN |
| 4 | 1.0 | 1.000000 | 1.0 |
| 5 | 1.0 | 1.000000 | 1.0 |
| 6 | 1.0 | NaN | 1.0 |
| POP_A | POP_B | POP_C | |
|---|---|---|---|
| 0 | 1 | 3 | 1 |
| 1 | 3 | 1 | 0 |
| 2 | 3 | 1 | 2 |
| 3 | 0 | 3 | 0 |
| 4 | 2 | 2 | 1 |
| 5 | 1 | 1 | 2 |
| 6 | 3 | 0 | 1 |
4) Chunked/Streaming AF from In-Memory SNPObject¶
allele_freq_stream(...) computes the same result in SNP chunks, which is useful for memory control on large datasets.
stream_af, stream_called = allele_freq_stream(
snp_lai,
chunk_size=3,
sample_labels=labels,
ancestry=1,
return_counts=True,
)
eager_af, eager_called = snp_lai.allele_freq(
sample_labels=labels,
ancestry=1,
return_counts=True,
)
np.testing.assert_allclose(stream_af, eager_af)
np.testing.assert_array_equal(stream_called, eager_called)
print("Chunked and eager ancestry-specific grouped AF match exactly.")
Chunked and eager ancestry-specific grouped AF match exactly.
5) Ancestry-Specific SNP Allele Frequencies with LAI Reader/Path¶
This example computes ancestry-specific SNP allele frequencies (not LAI frequencies).
SNP genotypes (
calldata_gt) provide the allele values being averagedLAI (
laiobj) is used only as a mask to keep haplotypes withLAI == ancestry
laiobj can be provided as either:
MSPReader(...)(window-streaming LAI)a path to an
.mspfile
Below, results are displayed by ancestry label (AFR, EUR) with called-allele counts.
from tempfile import TemporaryDirectory
from snputils.ancestry.io.local.read import MSPReader
import pandas as pd
def _write_toy_msp(path, sample_ids, chromosomes, starts, ends, lai, ancestry_map):
haplotypes = [f"{sid}.{phase}" for sid in sample_ids for phase in (0, 1)]
comment = "#Subpopulation order/codes: " + "\t".join(
f"{label}={code}" for code, label in ancestry_map.items()
)
header = ["#chm", "spos", "epos", "sgpos", "egpos", "n snps"] + haplotypes
with open(path, "w", encoding="utf-8") as handle:
handle.write(comment + "\n")
handle.write("\t".join(header) + "\n")
for w in range(lai.shape[0]):
prefix = [
str(int(chromosomes[w])),
str(int(starts[w])),
str(int(ends[w])),
str(int(starts[w])),
str(int(ends[w])),
"1",
]
handle.write("\t".join(prefix) + "\t" + "\t".join(map(str, lai[w].tolist())) + "\n")
with TemporaryDirectory() as tmpdir:
msp_path = Path(tmpdir) / "toy_stream.msp"
# Two ancestry labels used in the LAI file
ancestry_map = {0: "AFR", 1: "EUR"}
sample_ids = ["S0", "S1"]
chromosomes = np.array([1, 1], dtype=np.int64)
starts = np.array([1, 101], dtype=np.int64)
ends = np.array([100, 200], dtype=np.int64)
lai_windows = np.array(
[
[1, 1, 0, 1],
[0, 1, 1, 0],
],
dtype=np.uint8,
)
_write_toy_msp(msp_path, sample_ids, chromosomes, starts, ends, lai_windows, ancestry_map)
# 6 SNPs (sorted by chromosome/position), all on chr1
pos_stream = np.array([10, 50, 90, 120, 150, 180], dtype=np.int64)
chrom_stream = np.array(["1"] * pos_stream.size, dtype=object)
# Shape: (n_snps=6, n_samples=2, n_haplotypes=2)
gt_stream = np.array(
[
[[0, 1], [1, 1]], # pos 10
[[1, 0], [0, 1]], # pos 50
[[0, 0], [1, 1]], # pos 90
[[1, 1], [0, 0]], # pos 120
[[1, 0], [1, 1]], # pos 150
[[0, 1], [0, 0]], # pos 180
],
dtype=float,
)
snp_stream = SNPObject(
calldata_gt=gt_stream,
variants_chrom=chrom_stream,
variants_pos=pos_stream,
)
frames = []
for ancestry_code, ancestry_label in sorted(ancestry_map.items()):
af_reader, called_reader = allele_freq_stream(
snp_stream,
ancestry=ancestry_code,
laiobj=MSPReader(msp_path),
chunk_size=2,
lai_chunk_size=1,
return_counts=True,
)
af_path, called_path = allele_freq_stream(
snp_stream,
ancestry=ancestry_code,
laiobj=msp_path,
chunk_size=2,
lai_chunk_size=1,
return_counts=True,
)
# Reader and path modes should match exactly
np.testing.assert_allclose(af_reader, af_path)
np.testing.assert_array_equal(called_reader, called_path)
frames.append(
pd.DataFrame(
{
"chrom": chrom_stream,
"pos": pos_stream,
"ancestry_code": ancestry_code,
"ancestry_label": ancestry_label,
"allele_freq": af_reader,
"called_alleles": called_reader,
}
)
)
results = pd.concat(frames, ignore_index=True).sort_values(
["chrom", "pos", "ancestry_code"]
).reset_index(drop=True)
display(results)
print("Interpretation: each row is SNP AF after masking to the ancestry_label haplotypes only.")
| chrom | pos | ancestry_code | ancestry_label | allele_freq | called_alleles | |
|---|---|---|---|---|---|---|
| 0 | 1 | 10 | 0 | AFR | 1.000000 | 1 |
| 1 | 1 | 10 | 1 | EUR | 0.666667 | 3 |
| 2 | 1 | 50 | 0 | AFR | 0.000000 | 1 |
| 3 | 1 | 50 | 1 | EUR | 0.666667 | 3 |
| 4 | 1 | 90 | 0 | AFR | 1.000000 | 1 |
| 5 | 1 | 90 | 1 | EUR | 0.333333 | 3 |
| 6 | 1 | 120 | 0 | AFR | 0.500000 | 2 |
| 7 | 1 | 120 | 1 | EUR | 0.500000 | 2 |
| 8 | 1 | 150 | 0 | AFR | 1.000000 | 2 |
| 9 | 1 | 150 | 1 | EUR | 0.500000 | 2 |
| 10 | 1 | 180 | 0 | AFR | 0.000000 | 2 |
| 11 | 1 | 180 | 1 | EUR | 0.500000 | 2 |
Interpretation: each row is SNP AF after masking to the ancestry_label haplotypes only.
6) Optional Reader-Based Chunking (BED, PGEN, VCF/Polars)¶
If repository test data exists under ../data, this section demonstrates chunked AF directly from readers.
BED:
BEDReader.iter_read(...)PGEN:
PGENReader.iter_read(...)VCF:
VCFReaderPolars.iter_read(...)
To keep runtime manageable, this demo limits to ~10,000 variants per format.
data_dir = Path(repo_root) / "data"
paths = {
"BED": data_dir / "bed" / "subset.bed",
"PGEN": data_dir / "pgen" / "subset.pgen",
"VCF": data_dir / "vcf" / "subset.vcf",
}
max_variants = 10_000
def _write_vcf_head(source_path: Path, output_path: Path, n_variants: int) -> None:
written = 0
with open(source_path, "r", encoding="utf-8") as src, open(output_path, "w", encoding="utf-8") as dst:
for line in src:
dst.write(line)
if line.startswith("#"):
continue
written += 1
if written >= int(n_variants):
break
def _first_n_variant_idxs(reader, limit: int) -> np.ndarray:
n_total = int(reader.read(fields=["ID"]).n_snps)
return np.arange(min(limit, n_total), dtype=np.uint32)
if all(p.exists() for p in paths.values()):
bed_reader = BEDReader(paths["BED"])
pgen_reader = PGENReader(paths["PGEN"])
bed_variant_idxs = _first_n_variant_idxs(bed_reader, max_variants)
pgen_variant_idxs = _first_n_variant_idxs(pgen_reader, max_variants)
mini_vcf_path = data_dir / "vcf" / f"subset_{max_variants}.vcf"
if not mini_vcf_path.exists():
_write_vcf_head(paths["VCF"], mini_vcf_path, max_variants)
readers = {
"BED": (bed_reader, {"variant_idxs": bed_variant_idxs}),
"PGEN": (pgen_reader, {"variant_idxs": pgen_variant_idxs}),
"VCF-Polars": (VCFReaderPolars(mini_vcf_path), {}),
}
for name, (reader, reader_kwargs) in readers.items():
af_stream = allele_freq_stream(
reader,
chunk_size=250,
sum_strands=False,
**reader_kwargs,
)
af_eager = reader.read(sum_strands=False, **reader_kwargs).allele_freq()
max_abs_diff = float(np.max(np.abs(af_stream - af_eager)))
print(f"{name}: n_snps={af_stream.shape[0]}, max|stream-eager|={max_abs_diff:.3e}")
else:
print("Optional reader demo skipped: expected files not found under ../data")
BED: n_snps=10000, max|stream-eager|=0.000e+00
PGEN: n_snps=10000, max|stream-eager|=0.000e+00
VCF-Polars: n_snps=10000, max|stream-eager|=0.000e+00
Notes¶
Missing values (
-1orNaN) are ignored in AF denominators.return_counts=Truereturns called haplotype counts used for each estimate.as_dataframe=Trueis useful for grouped output readability.For very large datasets, prefer reader-based streaming (
iter_read) +allele_freq_streamto keep memory bounded by chunk size.