from __future__ import annotations
import math
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union
import numpy as np
import pandas as pd
from snputils._utils.allele_freq import aggregate_pop_allele_freq
def genomic_block_labels(
chrom: Sequence[Any],
pos: Sequence[int],
block_size_bp: int,
) -> np.ndarray:
"""
Build genomic block labels from chromosome and base-pair position arrays.
A new block starts when the chromosome changes or when the current variant is at least
``block_size_bp`` bases away from the start of the current block on that chromosome.
"""
if block_size_bp <= 0:
raise ValueError("block_size_bp must be positive.")
chrom_arr = np.asarray(chrom)
pos_arr = np.asarray(pos)
if chrom_arr.shape[0] != pos_arr.shape[0]:
raise ValueError("'chrom' and 'pos' must have the same length.")
labels: List[str] = []
current_chrom: Optional[str] = None
block_start = -1
block_index = -1
for c, p in zip(chrom_arr, pos_arr):
c = str(c)
p = int(p)
if current_chrom != c or p - block_start >= block_size_bp:
current_chrom = c
block_start = p
block_index += 1
labels.append(f"{c}:{block_index}")
return np.asarray(labels, dtype=object)
@dataclass
class BlockJackknifeResult:
est: float
se: float
z: float
p: float
n_blocks: int
n_snps: int
def _compute_block_indices(
n_snps: int,
size: int = 5000,
block_labels: Optional[np.ndarray] = None,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Build SNP-to-block assignments using fixed SNP counts per block.
Returns:
block_ids (int array of shape (n_snps,)): Block index for each SNP (0..B-1)
block_lengths (int array of shape (n_blocks,)): Number of SNPs per block
"""
if block_labels is not None:
block_labels = np.asarray(block_labels)
if block_labels.shape[0] != n_snps:
raise ValueError("'block_labels' must have length n_snps")
# Reindex to 0..B-1 preserving first appearance order
codes, uniques = pd.factorize(block_labels, sort=False)
counts = np.bincount(codes, minlength=len(uniques))
return codes.astype(np.int32), counts.astype(np.int32)
size = max(1, int(size))
block_ids = np.arange(n_snps) // size
# block lengths: last block may be shorter
n_blocks = int(block_ids.max()) + 1
block_lengths = np.bincount(block_ids, minlength=n_blocks)
return block_ids.astype(np.int32), block_lengths.astype(np.int32)
def _jackknife_ratio_from_block_sums(
num_block_sums: np.ndarray,
den_block_sums: np.ndarray,
) -> BlockJackknifeResult:
"""
Compute statistic = sum(num) / sum(den) and its block-jackknife SE using delete-one blocks.
"""
num_block_sums = np.asarray(num_block_sums, dtype=float)
den_block_sums = np.asarray(den_block_sums, dtype=float)
# Keep negative denominators, drop only non-finite and exactly-zero denominators
mask = (den_block_sums != 0) & np.isfinite(num_block_sums) & np.isfinite(den_block_sums)
if not np.any(mask):
return BlockJackknifeResult(float("nan"), float("nan"), float("nan"), float("nan"), 0, 0)
nb = int(mask.sum())
num_b = num_block_sums[mask]
den_b = den_block_sums[mask]
num_total = float(np.sum(num_b))
den_total = float(np.sum(den_b))
if den_total == 0:
return BlockJackknifeResult(float("nan"), float("nan"), float("nan"), float("nan"), nb, int(np.sum(den_b)))
est = num_total / den_total
# delete-one estimates
with np.errstate(divide="ignore", invalid="ignore"):
est_loo = (num_total - num_b) / (den_total - den_b)
# Remove degenerate cases where denominator becomes zero after deletion
valid_loo = np.isfinite(est_loo)
est_loo = est_loo[valid_loo]
nb2 = est_loo.size
if nb2 == 0:
return BlockJackknifeResult(est, 0.0, float("nan"), float("nan"), nb, int(np.sum(den_b)))
# standard jackknife variance
se = float(np.sqrt((nb2 - 1) / nb2 * np.sum((est_loo - est) ** 2)))
z = float(est / se) if se > 0 else float("nan")
p = float(math.erfc(abs(z) / math.sqrt(2))) if np.isfinite(z) else float("nan")
return BlockJackknifeResult(est, se, z, p, nb, int(np.sum(den_b)))
def _weighted_jackknife_from_block_estimates(
block_estimates: np.ndarray,
block_lengths: np.ndarray,
) -> BlockJackknifeResult:
block_estimates = np.asarray(block_estimates, dtype=float)
block_lengths = np.asarray(block_lengths, dtype=float)
valid = (
np.isfinite(block_estimates)
& np.isfinite(block_lengths)
& (block_lengths > 0)
)
if not np.any(valid):
return BlockJackknifeResult(float("nan"), float("nan"), float("nan"), float("nan"), 0, 0)
x = block_estimates[valid]
w = block_lengths[valid]
n_snps = int(np.sum(w))
nb = int(w.size)
if nb < 2:
est = float(np.average(x, weights=w))
return BlockJackknifeResult(est, float("nan"), float("nan"), float("nan"), nb, n_snps)
weight_sum = float(np.sum(w))
tot = float(np.average(x, weights=w))
rel = w / weight_sum
with np.errstate(divide="ignore", invalid="ignore"):
loo = (tot - x * rel) / (1.0 - rel)
finite = np.isfinite(loo)
if not np.any(finite):
return BlockJackknifeResult(tot, float("nan"), float("nan"), float("nan"), nb, n_snps)
loo = loo[finite]
w = w[finite]
weight_sum = float(np.sum(w))
nb = int(w.size)
h = weight_sum / w
est = float(np.average(loo, weights=(1.0 - 1.0 / h)))
se = float(np.sqrt(np.mean((est - loo) ** 2 * (h - 1.0))))
z = float(est / se) if se > 0 else float("nan")
p = float(math.erfc(abs(z) / math.sqrt(2))) if np.isfinite(z) else float("nan")
return BlockJackknifeResult(est, se, z, p, nb, int(weight_sum))
def _jackknife_block_ratio_estimates(
num_block_sums: np.ndarray,
den_block_sums: np.ndarray,
block_lengths: np.ndarray,
*,
min_abs_den: float = 1e-12,
) -> BlockJackknifeResult:
den_block_sums = np.asarray(den_block_sums, dtype=float)
with np.errstate(divide="ignore", invalid="ignore"):
block_estimates = np.asarray(num_block_sums, dtype=float) / den_block_sums
block_estimates = np.where(np.abs(den_block_sums) > min_abs_den, block_estimates, np.nan)
return _weighted_jackknife_from_block_estimates(block_estimates, block_lengths)
def _weighted_jackknife_ratio_from_block_sums(
num_block_sums: np.ndarray,
den_block_sums: np.ndarray,
block_lengths: np.ndarray,
*,
denominator_est_threshold: float = 1e-6,
) -> BlockJackknifeResult:
"""
Compute a weighted delete-one block jackknife for a ratio of block means.
"""
num_block_sums = np.asarray(num_block_sums, dtype=float)
den_block_sums = np.asarray(den_block_sums, dtype=float)
block_lengths = np.asarray(block_lengths, dtype=float)
with np.errstate(divide="ignore", invalid="ignore"):
num_blocks = num_block_sums / block_lengths
den_blocks = den_block_sums / block_lengths
valid = (
np.isfinite(num_blocks)
& np.isfinite(den_blocks)
& np.isfinite(block_lengths)
& (block_lengths > 0)
& (np.abs(den_blocks) >= denominator_est_threshold)
)
if not np.any(valid):
return BlockJackknifeResult(float("nan"), float("nan"), float("nan"), float("nan"), 0, 0)
num_b = num_blocks[valid]
den_b = den_blocks[valid]
weights = block_lengths[valid]
n_snps = int(np.sum(weights))
nb = int(weights.size)
weight_sum = float(np.sum(weights))
tot_num = float(np.sum(num_b * weights) / weight_sum)
tot_den = float(np.sum(den_b * weights) / weight_sum)
if tot_den == 0:
return BlockJackknifeResult(float("nan"), float("nan"), float("nan"), float("nan"), nb, n_snps)
tot = tot_num / tot_den
rel = weights / weight_sum
with np.errstate(divide="ignore", invalid="ignore"):
loo_num = (tot_num - num_b * rel) / (1.0 - rel)
loo_den = (tot_den - den_b * rel) / (1.0 - rel)
loo_ratio = loo_num / loo_den
finite = np.isfinite(loo_ratio)
if not np.any(finite):
return BlockJackknifeResult(tot, 0.0, float("nan"), float("nan"), nb, n_snps)
loo_ratio = loo_ratio[finite]
weights = weights[finite]
nb2 = int(weights.size)
weight_sum = float(np.sum(weights))
est = float(np.mean(tot - loo_ratio) * nb2 + np.sum(loo_ratio * weights) / weight_sum)
h = weight_sum / weights
tau = h * tot - (h - 1.0) * loo_ratio
se = float(np.sqrt(np.mean((tau - est) ** 2 / (h - 1.0))))
z = float(est / se) if se > 0 else float("nan")
p = float(math.erfc(abs(z) / math.sqrt(2))) if np.isfinite(z) else float("nan")
return BlockJackknifeResult(est, se, z, p, nb2, n_snps)
def _aggregate_to_pop_allele_freq(
calldata_gt: np.ndarray,
sample_labels: Sequence[str],
*,
ancestry: Optional[Union[str, int]] = None,
snpobj: Optional[Any] = None,
laiobj: Optional[Any] = None,
pseudohaploid: Union[bool, int] = False,
) -> Tuple[np.ndarray, np.ndarray, List[str]]:
"""
Convert sample-level genotypes into per-population allele frequency and count matrices.
"""
calldata_lai = None
if ancestry is not None:
if snpobj is not None and getattr(snpobj, "calldata_lai", None) is not None:
calldata_lai = snpobj.calldata_lai
elif laiobj is not None:
try:
converted_lai = laiobj.convert_to_snp_level(snpobject=snpobj, lai_format="3D")
calldata_lai = getattr(converted_lai, "calldata_lai", None)
except Exception:
calldata_lai = None
if calldata_lai is None:
raise ValueError(
"Ancestry-specific masking requires SNP-level LAI "
"(provide a LocalAncestryObject via 'laiobj' or ensure 'snpobj.calldata_lai' is set)."
)
afs, counts, pops = aggregate_pop_allele_freq(
calldata_gt=calldata_gt,
sample_labels=sample_labels,
ancestry=ancestry,
calldata_lai=calldata_lai,
pseudohaploid=pseudohaploid,
)
return afs, counts, pops
def _default_sample_labels_from_snpobj(snpobj: Any) -> List[str]:
"""
Default per-sample group labels for SNPObject inputs.
If ``sample_fid`` is set on the object and differs from ``samples`` for at least
one individual (PLINK FID vs IID), use ``sample_fid``. Otherwise use ``samples`` (IID) as the label
for each individual.
"""
if snpobj.samples is None:
raise ValueError("sample_labels must be provided when SNPObject.samples is None")
samples = np.asarray(snpobj.samples, dtype=object)
fid = getattr(snpobj, "sample_fid", None)
if fid is not None:
fid = np.asarray(fid, dtype=object)
if fid.shape[0] != samples.shape[0]:
raise ValueError("'sample_fid' must have the same length as 'samples'")
if np.any(fid.astype(str) != samples.astype(str)):
return [str(x) for x in fid]
return [str(x) for x in samples]
def _prepare_inputs(
data: Union[Any, Tuple[np.ndarray, np.ndarray, List[str]]],
sample_labels: Optional[Sequence[str]] = None,
*,
ancestry: Optional[Union[str, int]] = None,
laiobj: Optional[Any] = None,
pseudohaploid: Union[bool, int] = False,
) -> Tuple[np.ndarray, np.ndarray, List[str]]:
"""
Normalize inputs to allele frequencies, counts, and optional variant metadata.
"""
try:
from snputils.snp.genobj.snpobj import SNPObject # type: ignore
is_snpobj = isinstance(data, SNPObject)
except Exception:
is_snpobj = False
if is_snpobj:
snpobj = data
if sample_labels is None:
sample_labels = _default_sample_labels_from_snpobj(snpobj)
afs, counts, pops = _aggregate_to_pop_allele_freq(
snpobj.calldata_gt,
sample_labels,
ancestry=ancestry,
snpobj=snpobj,
laiobj=laiobj,
pseudohaploid=pseudohaploid,
)
return afs, counts, pops
if isinstance(data, tuple) and len(data) == 3:
afs, counts, pops = data
return np.asarray(afs), np.asarray(counts), list(pops)
raise ValueError(
"data must be either a SNPObject or a tuple (afs, counts, pops) where afs/counts have shape (n_snps, n_pops)"
)
def _build_blocks(
n_snps: int,
blocks: Optional[np.ndarray],
block_size: int,
) -> Tuple[np.ndarray, np.ndarray]:
if blocks is not None:
blocks = np.asarray(blocks)
if blocks.shape[0] != n_snps:
raise ValueError("'blocks' must have length n_snps")
return _compute_block_indices(n_snps, block_labels=blocks)
return _compute_block_indices(n_snps=n_snps, size=block_size)
def _block_ids_are_contiguous(block_ids: np.ndarray, block_lengths: np.ndarray) -> bool:
if block_ids.size == 0:
return True
if int(block_lengths.sum()) != block_ids.size:
return False
if np.any(np.diff(block_ids) < 0):
return False
counts = np.bincount(block_ids, minlength=block_lengths.size)
return bool(np.array_equal(counts[: block_lengths.size], block_lengths))
def _complete_block_product_sums(
afs: np.ndarray,
counts: np.ndarray,
block_ids: np.ndarray,
block_lengths: np.ndarray,
*,
require_counts_gt: int,
need_correction: bool,
) -> Optional[Tuple[np.ndarray, Optional[np.ndarray], np.ndarray]]:
"""
Precompute block sums of p_i * p_j for complete-data f-stat calculations.
This fast path is exact for cases where every requested SNP has valid allele
frequencies and sufficient haplotype counts for all populations. Missing or
low-count data falls back to the per-statistic masking logic.
"""
afs = np.asarray(afs, dtype=float)
counts = np.asarray(counts)
if afs.ndim != 2 or counts.shape != afs.shape:
return None
if not np.isfinite(afs).all() or not np.all(counts > require_counts_gt):
return None
n_snps, n_pops = afs.shape
n_blocks = block_lengths.size
den_block_sums = block_lengths.astype(float, copy=False)
pair_sums = np.empty((n_blocks, n_pops, n_pops), dtype=float)
corr_sums = np.empty((n_blocks, n_pops), dtype=float) if need_correction else None
if _block_ids_are_contiguous(block_ids, block_lengths):
starts = np.r_[0, np.cumsum(block_lengths[:-1])]
for b_idx, start in enumerate(starts):
end = int(start + block_lengths[b_idx])
block_afs = afs[int(start):end]
pair_sums[b_idx] = block_afs.T @ block_afs
if need_correction and corr_sums is not None:
block_counts = counts[int(start):end].astype(float, copy=False)
with np.errstate(divide="ignore", invalid="ignore"):
corr_sums[b_idx] = np.sum(block_afs * (1.0 - block_afs) / (block_counts - 1.0), axis=0)
else:
for i in range(n_pops):
ai = afs[:, i]
for j in range(n_pops):
pair_sums[:, i, j] = np.bincount(
block_ids,
weights=ai * afs[:, j],
minlength=n_blocks,
)
if need_correction and corr_sums is not None:
ci = counts[:, i].astype(float, copy=False)
with np.errstate(divide="ignore", invalid="ignore"):
corr = ai * (1.0 - ai) / (ci - 1.0)
corr_sums[:, i] = np.bincount(block_ids, weights=corr, minlength=n_blocks)
return pair_sums, corr_sums, den_block_sums
def _complete_block_hudson_fst_sums(
afs: np.ndarray,
counts: np.ndarray,
block_ids: np.ndarray,
block_lengths: np.ndarray,
) -> Optional[Tuple[np.ndarray, np.ndarray, np.ndarray]]:
"""
Precompute complete-data block sums needed by Hudson F_ST.
"""
afs = np.asarray(afs, dtype=float)
counts = np.asarray(counts)
if afs.ndim != 2 or counts.shape != afs.shape:
return None
if not np.isfinite(afs).all() or not np.all(counts > 1):
return None
n_snps, n_pops = afs.shape
n_blocks = block_lengths.size
af_sums = np.empty((n_blocks, n_pops), dtype=float)
pair_sums = np.empty((n_blocks, n_pops, n_pops), dtype=float)
pi_sums = np.empty((n_blocks, n_pops), dtype=float)
if _block_ids_are_contiguous(block_ids, block_lengths):
starts = np.r_[0, np.cumsum(block_lengths[:-1])]
for b_idx, start in enumerate(starts):
end = int(start + block_lengths[b_idx])
block_afs = afs[int(start):end]
block_counts = counts[int(start):end].astype(float, copy=False)
af_sums[b_idx] = np.sum(block_afs, axis=0)
pair_sums[b_idx] = block_afs.T @ block_afs
with np.errstate(divide="ignore", invalid="ignore"):
pi_sums[b_idx] = np.sum(
2.0 * block_afs * (1.0 - block_afs) * (block_counts / (block_counts - 1.0)),
axis=0,
)
else:
for i in range(n_pops):
ai = afs[:, i]
af_sums[:, i] = np.bincount(block_ids, weights=ai, minlength=n_blocks)
with np.errstate(divide="ignore", invalid="ignore"):
pi = 2.0 * ai * (1.0 - ai) * (counts[:, i].astype(float, copy=False) / (counts[:, i] - 1.0))
pi_sums[:, i] = np.bincount(block_ids, weights=pi, minlength=n_blocks)
for j in range(n_pops):
pair_sums[:, i, j] = np.bincount(
block_ids,
weights=ai * afs[:, j],
minlength=n_blocks,
)
return af_sums, pair_sums, pi_sums
def _complete_block_pair_het_product_sums(
afs: np.ndarray,
counts: np.ndarray,
block_ids: np.ndarray,
block_lengths: np.ndarray,
pairs: Sequence[Tuple[int, int]],
) -> Optional[Tuple[np.ndarray, Dict[Tuple[int, int], int]]]:
"""
Precompute block sums of h_ij * h_kl, where h_ij = p_i + p_j - 2 p_i p_j.
"""
afs = np.asarray(afs, dtype=float)
counts = np.asarray(counts)
if afs.ndim != 2 or counts.shape != afs.shape:
return None
if not np.isfinite(afs).all() or not np.all(counts > 0):
return None
norm_pairs = sorted({(min(i, j), max(i, j)) for i, j in pairs})
if not norm_pairs:
return None
pair_to_idx = {pair: idx for idx, pair in enumerate(norm_pairs)}
n_blocks = block_lengths.size
n_pairs = len(norm_pairs)
out = np.empty((n_blocks, n_pairs, n_pairs), dtype=float)
if _block_ids_are_contiguous(block_ids, block_lengths):
starts = np.r_[0, np.cumsum(block_lengths[:-1])]
for b_idx, start in enumerate(starts):
end = int(start + block_lengths[b_idx])
block_afs = afs[int(start):end]
h = np.empty((block_afs.shape[0], n_pairs), dtype=float)
for pair_idx, (i, j) in enumerate(norm_pairs):
pi = block_afs[:, i]
pj = block_afs[:, j]
h[:, pair_idx] = pi + pj - 2.0 * pi * pj
out[b_idx] = h.T @ h
else:
block_ids = np.asarray(block_ids, dtype=np.int64)
for b_idx in range(n_blocks):
block_afs = afs[block_ids == b_idx]
h = np.empty((block_afs.shape[0], n_pairs), dtype=float)
for pair_idx, (i, j) in enumerate(norm_pairs):
pi = block_afs[:, i]
pj = block_afs[:, j]
h[:, pair_idx] = pi + pj - 2.0 * pi * pj
out[b_idx] = h.T @ h
return out, pair_to_idx
[docs]
def f2(
data: Union[Any, Tuple[np.ndarray, np.ndarray, List[str]]],
pop1: Optional[Sequence[str]] = None,
pop2: Optional[Sequence[str]] = None,
sample_labels: Optional[Sequence[str]] = None,
apply_correction: bool = True,
block_size: int = 5000,
blocks: Optional[np.ndarray] = None,
ancestry: Optional[Union[str, int]] = None,
laiobj: Optional[Any] = None,
include_self: bool = False,
pseudohaploid: Union[bool, int] = False,
) -> pd.DataFrame:
"""
Compute f2-statistics with block-jackknife standard errors.
Args:
data: Either a SNPObject or a tuple (afs, counts, pops), where `afs` and `counts` are arrays of
shape (n_snps, n_pops). If a SNPObject is provided, `sample_labels` are used to aggregate samples to populations.
pop1, pop2: Populations to compute f2 for. If both are None, `include_self=False`
computes only off-diagonal pairs i<j; `include_self=True` computes all pairs
including diagonals i<=j.
sample_labels: Population label per sample (aligned with SNPObject.samples) when `data` is a SNPObject.
If omitted, labels are inferred from ``SNPObject.sample_fid`` when it differs from ``samples`` (PLINK FID);
otherwise each ``samples`` value is used as its own group label.
apply_correction: Apply small-sample correction p*(1-p)/(n-1) per population.
When True, SNPs with n<=1 in either population are excluded at that SNP.
block_size: Number of SNPs per jackknife block (default 5000 SNPs). Ignored if `blocks` is provided.
blocks: Optional explicit block id per SNP. If provided, overrides `block_size`.
ancestry: Optional ancestry code to mask genotypes to a specific ancestry before aggregation. Requires LAI.
laiobj: Optional `LocalAncestryObject` used to derive SNP-level LAI if it is not already present on the SNPObject.
pseudohaploid: If True, detects pseudo-haploid samples (samples with no heterozygotes in the first 1000 SNPs)
and treats them as haploid. If an integer `n` is provided, checks the first `n` SNPs.
If False, treats all samples as diploid.
Returns:
Pandas DataFrame with columns: pop1, pop2, est, se, z, p, n_blocks, n_snps
"""
afs, counts, pops = _prepare_inputs(data, sample_labels, ancestry=ancestry, laiobj=laiobj, pseudohaploid=pseudohaploid)
n_snps, n_pops = afs.shape
block_ids, block_lengths = _build_blocks(n_snps, blocks, block_size)
n_blocks = block_lengths.size
if pop1 is None and pop2 is None:
if include_self:
pop_pairs = [(pops[i], pops[j]) for i in range(n_pops) for j in range(i, n_pops)]
else:
pop_pairs = [(pops[i], pops[j]) for i in range(n_pops) for j in range(i + 1, n_pops)]
else:
if pop1 is None or pop2 is None:
raise ValueError("Both pop1 and pop2 must be provided when one is provided")
if len(pop1) != len(pop2):
raise ValueError("pop1 and pop2 must have the same length")
pop_pairs = list(zip(pop1, pop2))
name_to_idx = {p: i for i, p in enumerate(pops)}
rows: List[Dict[str, Union[str, float, int]]] = []
fast = _complete_block_product_sums(
afs,
counts,
block_ids,
block_lengths,
require_counts_gt=1 if apply_correction else 0,
need_correction=apply_correction,
)
if fast is not None:
pair_sums, corr_sums, den_block_sums = fast
for p1, p2 in pop_pairs:
i = name_to_idx[p1]
j = name_to_idx[p2]
num_block_sums = pair_sums[:, i, i] + pair_sums[:, j, j] - 2.0 * pair_sums[:, i, j]
if apply_correction:
assert corr_sums is not None
num_block_sums = num_block_sums - corr_sums[:, i] - corr_sums[:, j]
res = _jackknife_ratio_from_block_sums(num_block_sums, den_block_sums)
rows.append(
{
"pop1": p1,
"pop2": p2,
"est": res.est,
"se": res.se,
"z": res.z,
"p": res.p,
"n_blocks": res.n_blocks,
"n_snps": res.n_snps,
}
)
return pd.DataFrame(rows)
# Precompute per-block index lists for efficiency
block_bins = [np.where(block_ids == b)[0] for b in range(n_blocks)]
for p1, p2 in pop_pairs:
i = name_to_idx[p1]
j = name_to_idx[p2]
p_i = afs[:, i]
p_j = afs[:, j]
n_i = counts[:, i].astype(float)
n_j = counts[:, j].astype(float)
# per-SNP f2 with optional small-sample correction.
# If apply_correction is True, SNPs with n<=1 for either population are excluded.
with np.errstate(divide="ignore", invalid="ignore"):
num = (p_i - p_j) ** 2
if apply_correction:
corr_i = np.where(n_i > 1, (p_i * (1.0 - p_i)) / (n_i - 1.0), np.nan)
corr_j = np.where(n_j > 1, (p_j * (1.0 - p_j)) / (n_j - 1.0), np.nan)
num = num - corr_i - corr_j
# SNPs contribute only if the correction was defined (finite) and AFs present
if apply_correction:
snp_mask = np.isfinite(num)
else:
snp_mask = np.isfinite(num) & (n_i > 0) & (n_j > 0)
# Aggregate per block: sums of num and counts. Use NaN to mark empty blocks.
num_block_sums = np.full(n_blocks, np.nan, dtype=float)
den_block_sums = np.full(n_blocks, np.nan, dtype=float)
for b, idx in enumerate(block_bins):
if idx.size == 0:
continue
idx2 = idx[snp_mask[idx]]
if idx2.size == 0:
continue
num_block_sums[b] = float(np.nansum(num[idx2]))
den_block_sums[b] = float(idx2.size)
res = _jackknife_ratio_from_block_sums(num_block_sums, den_block_sums)
rows.append(
{
"pop1": p1,
"pop2": p2,
"est": res.est,
"se": res.se,
"z": res.z,
"p": res.p,
"n_blocks": res.n_blocks,
"n_snps": res.n_snps,
}
)
return pd.DataFrame(rows)
[docs]
def f3(
data: Union[Any, Tuple[np.ndarray, np.ndarray, List[str]]],
target: Optional[Sequence[str]] = None,
ref1: Optional[Sequence[str]] = None,
ref2: Optional[Sequence[str]] = None,
sample_labels: Optional[Sequence[str]] = None,
apply_correction: bool = True,
block_size: int = 5000,
blocks: Optional[np.ndarray] = None,
ancestry: Optional[Union[str, int]] = None,
laiobj: Optional[Any] = None,
pseudohaploid: Union[bool, int] = False,
) -> pd.DataFrame:
"""
Compute f3-statistics f3(target; ref1, ref2) with block-jackknife SE.
- `block_size` is the number of SNPs per jackknife block (default 5000 SNPs). Ignored if `blocks` is provided.
- If `target`, `ref1`, and `ref2` are all None, compute f3 for all combinations where each role can be any population.
- If `ancestry` is provided, genotypes will be masked to the specified ancestry using LAI before aggregation.
- If `apply_correction` is True, subtract the finite sample term p_t*(1-p_t)/(n_t-1) from the per-SNP product.
When True, SNPs with n_t<=1 are excluded.
- If `sample_labels` is omitted for a SNPObject, defaults match ``f2`` (PLINK ``sample_fid`` when FID differs from IID).
- `pseudohaploid`: If True, detects and treats pseudo-haploid samples as haploid. If int `n`, checks first `n` SNPs. If False, treats all as diploid.
"""
afs, counts, pops = _prepare_inputs(data, sample_labels, ancestry=ancestry, laiobj=laiobj, pseudohaploid=pseudohaploid)
n_snps, _ = afs.shape
block_ids, block_lengths = _build_blocks(n_snps, blocks, block_size)
n_blocks = block_lengths.size
if target is None and ref1 is None and ref2 is None:
triples = [(a, b, c) for a in pops for b in pops for c in pops]
else:
if target is None or ref1 is None or ref2 is None:
raise ValueError("target, ref1, and ref2 must all be provided if any is provided")
if not (len(target) == len(ref1) == len(ref2)):
raise ValueError("target, ref1, ref2 must have the same length")
triples = list(zip(target, ref1, ref2))
name_to_idx = {p: i for i, p in enumerate(pops)}
rows: List[Dict[str, Union[str, float, int]]] = []
fast = _complete_block_product_sums(
afs,
counts,
block_ids,
block_lengths,
require_counts_gt=1 if apply_correction else 0,
need_correction=apply_correction,
)
if fast is not None:
pair_sums, corr_sums, den_block_sums = fast
for t, r1, r2 in triples:
it = name_to_idx[t]
i1 = name_to_idx[r1]
i2 = name_to_idx[r2]
num_block_sums = (
pair_sums[:, it, it]
- pair_sums[:, it, i1]
- pair_sums[:, it, i2]
+ pair_sums[:, i1, i2]
)
if apply_correction:
assert corr_sums is not None
num_block_sums = num_block_sums - corr_sums[:, it]
res = _jackknife_ratio_from_block_sums(num_block_sums, den_block_sums)
rows.append(
{
"target": t,
"ref1": r1,
"ref2": r2,
"est": res.est,
"se": res.se,
"z": res.z,
"p": res.p,
"n_blocks": res.n_blocks,
"n_snps": res.n_snps,
}
)
return pd.DataFrame(rows)
block_bins = [np.where(block_ids == b)[0] for b in range(n_blocks)]
for t, r1, r2 in triples:
it = name_to_idx[t]
i1 = name_to_idx[r1]
i2 = name_to_idx[r2]
pt = afs[:, it]
p1 = afs[:, i1]
p2 = afs[:, i2]
nt = counts[:, it].astype(float)
n1 = counts[:, i1].astype(float)
n2 = counts[:, i2].astype(float)
with np.errstate(invalid="ignore", divide="ignore"):
num = (pt - p1) * (pt - p2)
if apply_correction:
corr_t = np.where(nt > 1, (pt * (1.0 - pt)) / (nt - 1.0), np.nan)
num = num - corr_t
if apply_correction:
# Require a valid correction on the target, references can be n>0
snp_mask = np.isfinite(num) & (n1 > 0) & (n2 > 0)
else:
snp_mask = np.isfinite(num) & (nt > 0) & (n1 > 0) & (n2 > 0)
num_block_sums = np.full(n_blocks, np.nan, dtype=float)
den_block_sums = np.full(n_blocks, np.nan, dtype=float)
for b, idx in enumerate(block_bins):
if idx.size == 0:
continue
idx2 = idx[snp_mask[idx]]
if idx2.size == 0:
continue
num_block_sums[b] = float(np.nansum(num[idx2]))
den_block_sums[b] = float(idx2.size)
res = _jackknife_ratio_from_block_sums(num_block_sums, den_block_sums)
rows.append(
{
"target": t,
"ref1": r1,
"ref2": r2,
"est": res.est,
"se": res.se,
"z": res.z,
"p": res.p,
"n_blocks": res.n_blocks,
"n_snps": res.n_snps,
}
)
return pd.DataFrame(rows)
[docs]
def f4(
data: Union[Any, Tuple[np.ndarray, np.ndarray, List[str]]],
a: Optional[Sequence[str]] = None,
b: Optional[Sequence[str]] = None,
c: Optional[Sequence[str]] = None,
d: Optional[Sequence[str]] = None,
sample_labels: Optional[Sequence[str]] = None,
block_size: int = 5000,
blocks: Optional[np.ndarray] = None,
ancestry: Optional[Union[str, int]] = None,
laiobj: Optional[Any] = None,
pseudohaploid: Union[bool, int] = False,
) -> pd.DataFrame:
"""
Compute f4-statistics f4(a, b; c, d) with block-jackknife SE.
- `block_size` is the number of SNPs per jackknife block (default 5000 SNPs). Ignored if `blocks` is provided.
- If `ancestry` is provided, genotypes will be masked to the specified ancestry using LAI before aggregation.
- `pseudohaploid`: If True, detects and treats pseudo-haploid samples as haploid. If int `n`, checks first `n` SNPs. If False, treats all as diploid.
"""
afs, counts, pops = _prepare_inputs(data, sample_labels, ancestry=ancestry, laiobj=laiobj, pseudohaploid=pseudohaploid)
n_snps, _ = afs.shape
block_ids, block_lengths = _build_blocks(n_snps, blocks, block_size)
n_blocks = block_lengths.size
if a is None and b is None and c is None and d is None:
quads = [(w, x, y, z) for w in pops for x in pops for y in pops for z in pops]
else:
if a is None or b is None or c is None or d is None:
raise ValueError("a, b, c, d must all be provided if any is provided")
if not (len(a) == len(b) == len(c) == len(d)):
raise ValueError("a, b, c, d must have the same length")
quads = list(zip(a, b, c, d))
name_to_idx = {p: i for i, p in enumerate(pops)}
rows: List[Dict[str, Union[str, float, int]]] = []
fast = _complete_block_product_sums(
afs,
counts,
block_ids,
block_lengths,
require_counts_gt=0,
need_correction=False,
)
if fast is not None:
pair_sums, _, den_block_sums = fast
for pa, pb, pc, dpop in quads:
ia = name_to_idx[pa]
ib = name_to_idx[pb]
ic = name_to_idx[pc]
id_ = name_to_idx[dpop]
num_block_sums = (
pair_sums[:, ia, ic]
- pair_sums[:, ia, id_]
- pair_sums[:, ib, ic]
+ pair_sums[:, ib, id_]
)
res = _jackknife_ratio_from_block_sums(num_block_sums, den_block_sums)
rows.append(
{
"a": pa,
"b": pb,
"c": pc,
"d": dpop,
"est": res.est,
"se": res.se,
"z": res.z,
"p": res.p,
"n_blocks": res.n_blocks,
"n_snps": res.n_snps,
}
)
return pd.DataFrame(rows)
block_bins = [np.where(block_ids == b)[0] for b in range(n_blocks)]
for pa, pb, pc, dpop in quads:
ia = name_to_idx[pa]
ib = name_to_idx[pb]
ic = name_to_idx[pc]
id_ = name_to_idx[dpop]
A = afs[:, ia]
B = afs[:, ib]
C = afs[:, ic]
D = afs[:, id_]
na = counts[:, ia].astype(float)
nb = counts[:, ib].astype(float)
nc = counts[:, ic].astype(float)
nd = counts[:, id_].astype(float)
with np.errstate(invalid="ignore"):
num = (A - B) * (C - D)
snp_mask = np.isfinite(num) & (na > 0) & (nb > 0) & (nc > 0) & (nd > 0)
num_block_sums = np.full(n_blocks, np.nan, dtype=float)
den_block_sums = np.full(n_blocks, np.nan, dtype=float)
for b_idx, idx in enumerate(block_bins):
if idx.size == 0:
continue
idx2 = idx[snp_mask[idx]]
if idx2.size == 0:
continue
num_block_sums[b_idx] = float(np.nansum(num[idx2]))
den_block_sums[b_idx] = float(idx2.size)
res = _jackknife_ratio_from_block_sums(num_block_sums, den_block_sums)
rows.append(
{
"a": pa,
"b": pb,
"c": pc,
"d": dpop,
"est": res.est,
"se": res.se,
"z": res.z,
"p": res.p,
"n_blocks": res.n_blocks,
"n_snps": res.n_snps,
}
)
return pd.DataFrame(rows)
[docs]
def d_stat(
data: Union[Any, Tuple[np.ndarray, np.ndarray, List[str]]],
a: Optional[Sequence[str]] = None,
b: Optional[Sequence[str]] = None,
c: Optional[Sequence[str]] = None,
d: Optional[Sequence[str]] = None,
sample_labels: Optional[Sequence[str]] = None,
block_size: int = 5000,
blocks: Optional[np.ndarray] = None,
ancestry: Optional[Union[str, int]] = None,
laiobj: Optional[Any] = None,
pseudohaploid: Union[bool, int] = False,
) -> pd.DataFrame:
"""
Compute D-statistics D(a, b; c, d) as ratio of sums:
`D = sum_l (A-B)(C-D) / sum_l (A+B-2AB)(C+D-2CD)`
with delete-one block jackknife SE.
- `block_size` is the number of SNPs per jackknife block (default 5000 SNPs). Ignored if `blocks` is provided.
- If `ancestry` is provided, genotypes will be masked to the specified ancestry using LAI before aggregation.
- `pseudohaploid`: If True, detects and treats pseudo-haploid samples as haploid. If int `n`, checks first `n` SNPs. If False, treats all as diploid.
"""
afs, counts, pops = _prepare_inputs(data, sample_labels, ancestry=ancestry, laiobj=laiobj, pseudohaploid=pseudohaploid)
n_snps, _ = afs.shape
block_ids, block_lengths = _build_blocks(n_snps, blocks, block_size)
n_blocks = block_lengths.size
if a is None and b is None and c is None and d is None:
quads = [(w, x, y, z) for w in pops for x in pops for y in pops for z in pops]
else:
if a is None or b is None or c is None or d is None:
raise ValueError("a, b, c, d must all be provided if any is provided")
if not (len(a) == len(b) == len(c) == len(d)):
raise ValueError("a, b, c, d must have the same length")
quads = list(zip(a, b, c, d))
name_to_idx = {p: i for i, p in enumerate(pops)}
bids = np.asarray(block_ids, dtype=np.int64)
rows: List[Dict[str, Union[str, float, int]]] = []
indexed_quads = [
(pa, pb, pc, dpop, name_to_idx[pa], name_to_idx[pb], name_to_idx[pc], name_to_idx[dpop])
for pa, pb, pc, dpop in quads
]
fast_products = _complete_block_product_sums(
afs,
counts,
block_ids,
block_lengths,
require_counts_gt=0,
need_correction=False,
)
den_pairs = [(ia, ib) for _, _, _, _, ia, ib, _, _ in indexed_quads]
den_pairs.extend((ic, id_) for _, _, _, _, _, _, ic, id_ in indexed_quads)
fast_den = _complete_block_pair_het_product_sums(afs, counts, block_ids, block_lengths, den_pairs)
if fast_products is not None and fast_den is not None:
pair_sums, _, _ = fast_products
hetprod_sums, pair_to_idx = fast_den
n_snps_used = int(block_lengths.sum())
for pa, pb, pc, dpop, ia, ib, ic, id_ in indexed_quads:
num_block_sums = (
pair_sums[:, ia, ic]
- pair_sums[:, ia, id_]
- pair_sums[:, ib, ic]
+ pair_sums[:, ib, id_]
)
p_ab = pair_to_idx[(min(ia, ib), max(ia, ib))]
p_cd = pair_to_idx[(min(ic, id_), max(ic, id_))]
den_block_sums = hetprod_sums[:, p_ab, p_cd].copy()
den_block_sums[np.isfinite(den_block_sums) & (np.abs(den_block_sums) <= 1e-12)] = np.nan
res = _jackknife_ratio_from_block_sums(num_block_sums, den_block_sums)
rows.append(
{
"a": pa,
"b": pb,
"c": pc,
"d": dpop,
"est": res.est,
"se": res.se,
"z": res.z,
"p": res.p,
"n_blocks": res.n_blocks,
"n_snps": n_snps_used,
}
)
return pd.DataFrame(rows)
for pa, pb, pc, dpop, ia, ib, ic, id_ in indexed_quads:
A = afs[:, ia]
B = afs[:, ib]
C = afs[:, ic]
D = afs[:, id_]
na = counts[:, ia].astype(float)
nb = counts[:, ib].astype(float)
nc = counts[:, ic].astype(float)
nd = counts[:, id_].astype(float)
with np.errstate(invalid="ignore"):
num = (A - B) * (C - D)
den = (A + B - 2 * A * B) * (C + D - 2 * C * D)
snp_mask = np.isfinite(num) & np.isfinite(den) & (na > 0) & (nb > 0) & (nc > 0) & (nd > 0)
num_sel = np.where(snp_mask, num.astype(np.float64, copy=False), 0.0)
den_sel = np.where(snp_mask, den.astype(np.float64, copy=False), 0.0)
num_block_sums = np.bincount(bids, weights=num_sel, minlength=n_blocks).astype(np.float64, copy=False)
den_raw = np.bincount(bids, weights=den_sel, minlength=n_blocks).astype(np.float64, copy=False)
ct_block_sums = np.bincount(bids, weights=snp_mask.astype(np.int64), minlength=n_blocks)
empty = ct_block_sums == 0
num_block_sums[empty] = np.nan
den_raw[empty] = np.nan
den_raw[np.isfinite(den_raw) & (np.abs(den_raw) <= 1e-12)] = np.nan
den_block_sums = den_raw
res = _jackknife_ratio_from_block_sums(num_block_sums, den_block_sums)
rows.append(
{
"a": pa,
"b": pb,
"c": pc,
"d": dpop,
"est": res.est,
"se": res.se,
"z": res.z,
"p": res.p,
"n_blocks": res.n_blocks,
"n_snps": int(ct_block_sums.sum()),
}
)
return pd.DataFrame(rows)
[docs]
def f4_ratio(
data: Union[Any, Tuple[np.ndarray, np.ndarray, List[str]]],
num: Sequence[Tuple[str, str, str, str]],
den: Sequence[Tuple[str, str, str, str]],
sample_labels: Optional[Sequence[str]] = None,
block_size: int = 5000,
blocks: Optional[np.ndarray] = None,
ancestry: Optional[Union[str, int]] = None,
laiobj: Optional[Any] = None,
pseudohaploid: Union[bool, int] = False,
) -> pd.DataFrame:
"""
Compute f4-ratio statistics as ratio of two f4-statistics with block-jackknife SE.
Args:
num: Sequence of quadruples (a, b, c, d) for numerator f4(a, b; c, d)
den: Sequence of quadruples (a, b, c, d) for denominator f4(a, b; c, d)
Notes:
- `block_size` is the number of SNPs per jackknife block (default 5000 SNPs). Ignored if `blocks` is provided.
- If `ancestry` is provided, genotypes will be masked to the specified ancestry using LAI before aggregation.
- `pseudohaploid`: If True, detects and treats pseudo-haploid samples as haploid. If int `n`, checks first `n` SNPs. If False, treats all as diploid.
"""
if len(num) != len(den):
raise ValueError("'num' and 'den' must have the same length")
afs, counts, pops = _prepare_inputs(data, sample_labels, ancestry=ancestry, laiobj=laiobj, pseudohaploid=pseudohaploid)
n_snps, _ = afs.shape
block_ids, block_lengths = _build_blocks(n_snps, blocks, block_size)
n_blocks = block_lengths.size
name_to_idx = {p: i for i, p in enumerate(pops)}
rows: List[Dict[str, Union[str, float, int]]] = []
fast = _complete_block_product_sums(
afs,
counts,
block_ids,
block_lengths,
require_counts_gt=0,
need_correction=False,
)
if fast is not None:
pair_sums, _, _ = fast
for (na, nb, nc, nd), (da, db, dc, dd) in zip(num, den):
ia, ib, ic, id_ = name_to_idx[na], name_to_idx[nb], name_to_idx[nc], name_to_idx[nd]
ja, jb, jc, jd = name_to_idx[da], name_to_idx[db], name_to_idx[dc], name_to_idx[dd]
num_block_sums = (
pair_sums[:, ia, ic]
- pair_sums[:, ia, id_]
- pair_sums[:, ib, ic]
+ pair_sums[:, ib, id_]
)
den_block_sums = (
pair_sums[:, ja, jc]
- pair_sums[:, ja, jd]
- pair_sums[:, jb, jc]
+ pair_sums[:, jb, jd]
)
res = _weighted_jackknife_ratio_from_block_sums(
num_block_sums,
den_block_sums,
block_lengths,
)
rows.append(
{
"num": f"({na},{nb};{nc},{nd})",
"den": f"({da},{db};{dc},{dd})",
"est": res.est,
"se": res.se,
"z": res.z,
"p": res.p,
"n_blocks": res.n_blocks,
"n_snps": res.n_snps,
}
)
return pd.DataFrame(rows)
block_bins = [np.where(block_ids == b)[0] for b in range(n_blocks)]
for (na, nb, nc, nd), (da, db, dc, dd) in zip(num, den):
ia, ib, ic, id_ = name_to_idx[na], name_to_idx[nb], name_to_idx[nc], name_to_idx[nd]
ja, jb, jc, jd = name_to_idx[da], name_to_idx[db], name_to_idx[dc], name_to_idx[dd]
A, B, C, D = afs[:, ia], afs[:, ib], afs[:, ic], afs[:, id_]
E, F, G, H = afs[:, ja], afs[:, jb], afs[:, jc], afs[:, jd]
nA, nB, nC, nD = counts[:, ia], counts[:, ib], counts[:, ic], counts[:, id_]
nE, nF, nG, nH = counts[:, ja], counts[:, jb], counts[:, jc], counts[:, jd]
with np.errstate(invalid="ignore"):
num_snp = (A - B) * (C - D)
den_snp = (E - F) * (G - H)
mask_num = np.isfinite(num_snp) & (nA > 0) & (nB > 0) & (nC > 0) & (nD > 0)
mask_den = np.isfinite(den_snp) & (nE > 0) & (nF > 0) & (nG > 0) & (nH > 0)
mask_both = mask_num & mask_den
num_block_sums = np.full(n_blocks, np.nan, dtype=float)
den_block_sums = np.full(n_blocks, np.nan, dtype=float)
ct_block_sums = np.zeros(n_blocks, dtype=int)
for b_idx, idx in enumerate(block_bins):
if idx.size == 0:
continue
idx2 = idx[mask_both[idx]]
if idx2.size == 0:
continue
num_block_sums[b_idx] = float(np.nansum(num_snp[idx2]))
den_block_sums[b_idx] = float(np.nansum(den_snp[idx2]))
ct_block_sums[b_idx] = int(idx2.size)
res = _weighted_jackknife_ratio_from_block_sums(
num_block_sums,
den_block_sums,
ct_block_sums,
)
rows.append(
{
"num": f"({na},{nb};{nc},{nd})",
"den": f"({da},{db};{dc},{dd})",
"est": res.est,
"se": res.se,
"z": res.z,
"p": res.p,
"n_blocks": res.n_blocks,
"n_snps": res.n_snps,
}
)
return pd.DataFrame(rows)
[docs]
def fst(
data: Union[Any, Tuple[np.ndarray, np.ndarray, List[str]]],
pop1: Optional[Sequence[str]] = None,
pop2: Optional[Sequence[str]] = None,
*,
method: str = "hudson",
sample_labels: Optional[Sequence[str]] = None,
block_size: int = 5000,
blocks: Optional[np.ndarray] = None,
ancestry: Optional[Union[str, int]] = None,
laiobj: Optional[Any] = None,
include_self: bool = False,
pseudohaploid: Union[bool, int] = False,
) -> pd.DataFrame:
"""
Pairwise F_ST with delete-one block jackknife SE.
Methods:
`hudson`:
Ratio-of-averages following Hudson 1992 / Bhatia 2013. Uses
`num = d_xy - 0.5*(pi_x + pi_y)` and `den = d_xy`, where
`d_xy = p_x*(1-p_y) + p_y*(1-p_x)` and `pi_x = 2*p_x*(1-p_x)*n_x/(n_x-1)`.
`weir_cockerham`:
Weir and Cockerham's theta for two populations. Computes per-SNP
variance components `a`, `b`, and `c`, then uses a ratio-of-sums
jackknife with `num = a` and `den = a + b + c`.
Notes:
* Inputs are the same as f2/f3/f4: either SNPObject or (afs, counts, pops).
* For WC we use expected heterozygosity h_i = 2 p_i (1 - p_i) from allele freqs.
* SNPs with n<=1 in either pop or with invalid denominators are ignored.
* `pseudohaploid`: If True, detects and treats pseudo-haploid samples as haploid. If int `n`, checks first `n` SNPs. If False, treats all as diploid.
"""
method = str(method).strip().lower().replace("-", "_")
if method not in {"hudson", "weir_cockerham"}:
raise ValueError("method must be 'hudson' or 'weir_cockerham'")
afs, counts, pops = _prepare_inputs(data, sample_labels, ancestry=ancestry, laiobj=laiobj, pseudohaploid=pseudohaploid)
n_snps, n_pops = afs.shape
block_ids, block_lengths = _build_blocks(n_snps, blocks, block_size)
n_blocks = block_lengths.size
if pop1 is None and pop2 is None:
if include_self:
pairs = [(pops[i], pops[j]) for i in range(n_pops) for j in range(i, n_pops)]
else:
pairs = [(pops[i], pops[j]) for i in range(n_pops) for j in range(i + 1, n_pops)]
else:
if pop1 is None or pop2 is None or len(pop1) != len(pop2):
raise ValueError("pop1 and pop2 must both be provided and of equal length")
pairs = list(zip(pop1, pop2))
name_to_idx = {p: i for i, p in enumerate(pops)}
out_rows: List[Dict[str, Union[str, float, int]]] = []
if method == "hudson":
fast = _complete_block_hudson_fst_sums(afs, counts, block_ids, block_lengths)
if fast is not None:
af_sums, pair_sums, pi_sums = fast
for pA, pB in pairs:
i = name_to_idx[pA]
j = name_to_idx[pB]
den_block_sums = af_sums[:, i] + af_sums[:, j] - 2.0 * pair_sums[:, i, j]
num_block_sums = den_block_sums - 0.5 * (pi_sums[:, i] + pi_sums[:, j])
res = _jackknife_block_ratio_estimates(
num_block_sums,
den_block_sums,
block_lengths,
)
out_rows.append(
{
"pop1": pA,
"pop2": pB,
"method": method,
"est": res.est,
"se": res.se,
"z": res.z,
"p": res.p,
"n_blocks": res.n_blocks,
"n_snps": n_snps,
}
)
return pd.DataFrame(out_rows)
block_bins = [np.where(block_ids == b)[0] for b in range(n_blocks)]
for pA, pB in pairs:
i = name_to_idx[pA]
j = name_to_idx[pB]
p1 = afs[:, i].astype(float)
p2 = afs[:, j].astype(float)
n1 = counts[:, i].astype(float)
n2 = counts[:, j].astype(float)
valid = np.isfinite(p1) & np.isfinite(p2)
if method == "hudson":
# d_xy and within-pop diversities (unbiased, haplotype-based)
d_xy = p1 * (1.0 - p2) + p2 * (1.0 - p1)
with np.errstate(divide="ignore", invalid="ignore"):
pi1 = 2.0 * p1 * (1.0 - p1) * (n1 / (n1 - 1.0))
pi2 = 2.0 * p2 * (1.0 - p2) * (n2 / (n2 - 1.0))
num_snp = d_xy - 0.5 * (pi1 + pi2)
den_snp = d_xy
snp_mask = valid & (n1 > 1) & (n2 > 1) & np.isfinite(num_snp) & np.isfinite(den_snp)
else:
# Weir-Cockerham θ components (r=2)
n = n1 + n2
with np.errstate(divide="ignore", invalid="ignore"):
n_bar = n / 2.0
p_bar = np.where(n > 0, (n1 * p1 + n2 * p2) / n, np.nan)
s2 = (n1 * (p1 - p_bar) ** 2 + n2 * (p2 - p_bar) ** 2) / n_bar
h1 = 2.0 * p1 * (1.0 - p1)
h2 = 2.0 * p2 * (1.0 - p2)
h_bar = 0.5 * (h1 + h2)
n_c = n - (n1 * n1 + n2 * n2) / np.where(n > 0, n, np.nan) # == 2*n1*n2/n
# components
a = (n_bar / n_c) * (s2 - (p_bar * (1.0 - p_bar) - 0.5 * s2 - 0.25 * h_bar) / (n_bar - 1.0))
b = (n_bar / (n_bar - 1.0)) * (p_bar * (1.0 - p_bar) - 0.5 * s2 - ((2.0 * n_bar - 1.0) / (4.0 * n_bar)) * h_bar)
c = 0.5 * h_bar
num_snp = a
den_snp = a + b + c
# Need at least 2 haplotypes per pop and well-defined denominators
snp_mask = valid & (n1 > 1) & (n2 > 1) & np.isfinite(num_snp) & np.isfinite(den_snp)
# Aggregate by blocks
num_block_sums = np.full(n_blocks, np.nan, dtype=float)
den_block_sums = np.full(n_blocks, np.nan, dtype=float)
ct_block_sums = np.zeros(n_blocks, dtype=int)
for b_idx, idx in enumerate(block_bins):
if idx.size == 0:
continue
idx2 = idx[snp_mask[idx]]
if idx2.size == 0:
continue
ns = float(np.nansum(num_snp[idx2]))
ds = float(np.nansum(den_snp[idx2]))
# drop numerically-zero denominators for stability
den_block_sums[b_idx] = ds if abs(ds) > 1e-12 else np.nan
num_block_sums[b_idx] = ns
ct_block_sums[b_idx] = int(idx2.size)
if method == "hudson":
res = _jackknife_block_ratio_estimates(num_block_sums, den_block_sums, ct_block_sums)
else:
res = _jackknife_ratio_from_block_sums(num_block_sums, den_block_sums)
out_rows.append(
{
"pop1": pA,
"pop2": pB,
"method": method,
"est": res.est,
"se": res.se,
"z": res.z,
"p": res.p,
"n_blocks": res.n_blocks,
"n_snps": int(ct_block_sums.sum()),
}
)
return pd.DataFrame(out_rows)
__all__ = [
"f2",
"f3",
"f4",
"d_stat",
"f4_ratio",
"fst",
]