Source code for snputils.datasets.synthetic

from __future__ import annotations

from typing import TYPE_CHECKING, Sequence

import numpy as np
import pandas as pd

from snputils.ancestry.genobj.local import LocalAncestryObject
from snputils.phenotype.genobj import (
    CovariateObject,
    MultiPhenotypeObject,
    PhenotypeObject,
)
from snputils.snp.genobj.snpobj import SNPObject
from snputils.visualization.constants import CHROM_SIZES

if TYPE_CHECKING:
    from snputils.snp.genobj.grgobj import GRGObject


DEFAULT_SYNTHETIC_ANCESTRY_MAP = {"0": "AFR", "1": "EUR", "2": "EAS"}


_POPULATION_COORDS = {
    "AFR_ref": np.array([1.0, 0.0], dtype=np.float64),
    "EUR_ref": np.array([-0.5, 0.866], dtype=np.float64),
    "EAS_ref": np.array([-0.5, -0.866], dtype=np.float64),
    "ADMIXED_AFR_EUR": np.array([0.25, 0.433], dtype=np.float64),
    "ADMIXED_EUR_EAS": np.array([-0.5, 0.0], dtype=np.float64),
}

_LABEL_ANCESTRY_SOURCE_WEIGHTS = {
    "AFR_ref": {"AFR": np.asarray([0.75, 0.20, 0.05], dtype=np.float64)},
    "EUR_ref": {"EUR": np.asarray([0.70, 0.20, 0.10], dtype=np.float64)},
    "EAS_ref": {"EAS": np.asarray([0.72, 0.18, 0.10], dtype=np.float64)},
    "ADMIXED_AFR_EUR": {
        "AFR": np.asarray([0.20, 0.65, 0.15], dtype=np.float64),
        "EUR": np.asarray([0.12, 0.58, 0.30], dtype=np.float64),
    },
    "ADMIXED_EUR_EAS": {
        "EUR": np.asarray([0.18, 0.52, 0.30], dtype=np.float64),
        "EAS": np.asarray([0.15, 0.62, 0.23], dtype=np.float64),
    },
}


def _balanced_labels(n_samples: int, labels: Sequence[str]) -> np.ndarray:
    if n_samples <= 0:
        raise ValueError("n_samples must be positive.")
    if len(labels) == 0:
        raise ValueError("At least one label is required.")
    return np.asarray([labels[idx % len(labels)] for idx in range(n_samples)], dtype=object)


def _sample_latent_coordinates(labels: np.ndarray, rng: np.random.Generator) -> np.ndarray:
    coords = np.vstack([_POPULATION_COORDS[str(label)] for label in labels.astype(str)])
    coords = coords + rng.normal(0.0, 0.08, size=coords.shape)
    return coords.astype(np.float64, copy=False)


def _standardize(values: np.ndarray) -> np.ndarray:
    values = np.asarray(values, dtype=np.float64)
    centered = values - float(np.mean(values))
    sd = float(np.std(centered, ddof=0))
    if sd <= 0.0 or not np.isfinite(sd):
        return centered
    return centered / sd


def _allele_probabilities_from_coordinates(
    coords: np.ndarray,
    n_snps: int,
    rng: np.random.Generator,
    *,
    load_scale: float = 1.15,
    noise_scale: float = 0.10,
) -> np.ndarray:
    intercept = rng.normal(0.0, 0.45, size=n_snps)
    loadings = rng.normal(0.0, load_scale, size=(2, n_snps))
    logits = intercept[None, :] + coords @ loadings + rng.normal(0.0, noise_scale, size=(coords.shape[0], n_snps))
    return np.clip(1.0 / (1.0 + np.exp(-logits)), 0.02, 0.98)


[docs] def build_synthetic_snp_dataset( n_samples: int = 12, n_snps: int = 100, seed: int | None = 0, *, n_populations: int = 3, missing_rate: float = 0.0, phased: bool = True, chromosome: str = "1", sample_prefix: str = "S", variant_prefix: str = "rs_syn", ) -> SNPObject: """Build a small SNPObject with realistic metadata and population structure. Population labels are stored in ``sample_fid``. """ if n_samples <= 0: raise ValueError("n_samples must be positive.") if n_snps <= 0: raise ValueError("n_snps must be positive.") if n_populations <= 0: raise ValueError("n_populations must be positive.") if not 0.0 <= missing_rate < 1.0: raise ValueError("missing_rate must be in the interval [0, 1).") rng = np.random.default_rng(seed) samples = np.asarray([f"{sample_prefix}{idx + 1:02d}" for idx in range(n_samples)], dtype=object) population_names = [f"POP_{chr(ord('A') + idx)}" for idx in range(n_populations)] sample_fid = _balanced_labels(n_samples, population_names) sample_sex = np.asarray([(idx % 2) + 1 for idx in range(n_samples)], dtype=np.int8) base_af = rng.beta(0.8, 0.8, size=n_snps) pop_offsets = rng.normal(0.0, 0.12, size=(n_populations, n_snps)) pop_af = np.clip(base_af[None, :] + pop_offsets, 0.02, 0.98) pop_index = {pop: idx for idx, pop in enumerate(population_names)} genotypes = np.empty((n_snps, n_samples, 2), dtype=np.int8) for sample_idx, pop in enumerate(sample_fid.tolist()): probs = pop_af[pop_index[str(pop)]] genotypes[:, sample_idx, :] = rng.binomial(1, probs[:, None], size=(n_snps, 2)) if missing_rate > 0: missing = rng.random(genotypes.shape) < missing_rate genotypes[missing] = -1 refs = np.asarray(["A", "C", "G", "T"], dtype=object) alts = np.asarray(["G", "T", "A", "C"], dtype=object) variants_ref = np.resize(refs, n_snps) variants_alt = np.resize(alts, n_snps) variants_chrom = np.repeat(str(chromosome), n_snps).astype(object) variants_pos = np.arange(1, n_snps + 1, dtype=np.int64) * 1_000 variants_id = np.asarray([f"{variant_prefix}_{idx + 1:05d}" for idx in range(n_snps)], dtype=object) if not phased: genotype_matrix = genotypes.sum(axis=2, dtype=np.int8) genotype_matrix[np.any(genotypes < 0, axis=2)] = -1 else: genotype_matrix = genotypes return SNPObject( genotypes=genotype_matrix, samples=samples, sample_fid=sample_fid, sample_sex=sample_sex, variants_ref=variants_ref, variants_alt=variants_alt, variants_chrom=variants_chrom, variants_id=variants_id, variants_pos=variants_pos, )
[docs] def build_synthetic_phenotype_dataset( n_samples: int = 24, n_snps: int = 200, seed: int | None = 0, *, snpobj: SNPObject | None = None, missing_rate: float = 0.0, ) -> dict[str, object]: """Build a sample-aligned SNP/phenotype/covariate cohort for tutorials. The returned objects are intentionally small and self-contained so docs and tests can demonstrate phenotype handling, file readers, and association workflows without downloading external data. """ rng = np.random.default_rng(seed) if snpobj is None: snpobj = build_synthetic_snp_dataset( n_samples=n_samples, n_snps=n_snps, seed=seed, missing_rate=missing_rate, phased=True, ) elif not isinstance(snpobj, SNPObject): raise TypeError("snpobj must be a SNPObject.") sample_ids = np.asarray(snpobj.samples, dtype=object).astype(str) if sample_ids.size == 0: raise ValueError("snpobj must contain at least one sample.") genotypes = np.asarray(snpobj.genotypes) if genotypes.ndim == 3: dosages = genotypes.sum(axis=2, dtype=np.int16).astype(np.float64) dosages[np.any(genotypes < 0, axis=2)] = np.nan elif genotypes.ndim == 2: dosages = genotypes.astype(np.float64, copy=False) dosages[dosages < 0] = np.nan else: raise ValueError("Synthetic phenotype generation expects 2D or 3D genotypes.") n_snps_actual, n_samples_actual = dosages.shape if n_snps_actual == 0: raise ValueError("snpobj must contain at least one variant.") effect_count = min(3, n_snps_actual) effect_indexes = np.linspace(0, n_snps_actual - 1, num=effect_count, dtype=np.int64) effect_weights = np.array([0.95, -0.70, 0.45], dtype=np.float64)[:effect_count] dosage_matrix = dosages[effect_indexes].T.copy() col_means = np.nanmean(dosage_matrix, axis=0) col_means = np.where(np.isfinite(col_means), col_means, 1.0) missing = np.isnan(dosage_matrix) if np.any(missing): dosage_matrix[missing] = np.take(col_means, np.where(missing)[1]) dosage_signal = _standardize(dosage_matrix @ effect_weights) age = rng.integers(24, 71, size=n_samples_actual, endpoint=False).astype(np.int16) batch = ((np.arange(n_samples_actual) % 2) + 1).astype(np.int8) sex = np.asarray(getattr(snpobj, "sample_sex", ((np.arange(n_samples_actual) % 2) + 1)), dtype=np.int8) sample_fid = getattr(snpobj, "sample_fid", None) if sample_fid is None: ancestry_shift = np.zeros(n_samples_actual, dtype=np.float64) else: labels = np.asarray(sample_fid, dtype=object).astype(str) unique_labels = sorted(set(labels.tolist())) shift_map = { label: offset for label, offset in zip( unique_labels, np.linspace(-0.4, 0.4, num=len(unique_labels), dtype=np.float64), ) } ancestry_shift = np.asarray([shift_map[label] for label in labels], dtype=np.float64) quantitative_values = ( 0.05 * (age.astype(np.float64) - float(np.mean(age))) + 0.30 * (batch == 2).astype(np.float64) - 0.22 * (sex == 2).astype(np.float64) + 0.85 * dosage_signal + ancestry_shift + rng.normal(0.0, 0.55, size=n_samples_actual) ) quantitative_values = _standardize(quantitative_values) liability = ( 0.70 * dosage_signal + 0.35 * _standardize(age) + 0.25 * (batch == 2).astype(np.float64) - 0.20 * (sex == 2).astype(np.float64) + 0.35 * ancestry_shift + rng.normal(0.0, 0.65, size=n_samples_actual) ) binary_01 = (liability > np.median(liability)).astype(np.int8) if binary_01.sum() == 0 or binary_01.sum() == binary_01.size: order = np.argsort(liability) binary_01 = np.zeros(n_samples_actual, dtype=np.int8) binary_01[order[n_samples_actual // 2 :]] = 1 binary_12 = (binary_01 + 1).astype(np.int8) phenotype_table = pd.DataFrame( { "IID": sample_ids, "trait_quantitative": np.round(quantitative_values, 4), "trait_binary_01": binary_01, "trait_binary_12": binary_12, "age": age.astype(np.int64), "batch": batch.astype(np.int64), "sex": sex.astype(np.int64), } ) quantitative = PhenotypeObject( samples=sample_ids, values=phenotype_table["trait_quantitative"].to_numpy(), phenotype_name="TRAIT_Q", quantitative=True, ) binary = PhenotypeObject( samples=sample_ids, values=phenotype_table["trait_binary_12"].to_numpy(), phenotype_name="TRAIT_BIN", quantitative=None, ) covariates = CovariateObject( samples=sample_ids, values=phenotype_table[["age", "batch", "sex"]].to_numpy(), covariate_names=["age", "batch", "sex"], ) return { "snpobj": snpobj, "phen_df": phenotype_table, "multi_phenotype": MultiPhenotypeObject(phenotype_table.copy(), sample_column="IID"), "quantitative": quantitative, "binary": binary, "covariates": covariates, "effect_variant_ids": np.asarray(snpobj.variants_id, dtype=object)[effect_indexes].astype(str).tolist(), "shuffled_phen_df": phenotype_table.sample(frac=1.0, random_state=seed).reset_index(drop=True), }
def _make_structured_snpobject( *, samples: np.ndarray, labels: np.ndarray, variant_ids: np.ndarray, variant_positions: np.ndarray, seed: int | None, chromosome: str = "1", missing_rate: float = 0.0, ) -> SNPObject: rng = np.random.default_rng(seed) n_snps = len(variant_ids) n_samples = len(samples) coords = _sample_latent_coordinates(labels, rng) sample_af = _allele_probabilities_from_coordinates(coords, n_snps, rng) genotypes = np.empty((n_snps, n_samples, 2), dtype=np.int8) for sample_idx in range(n_samples): probs = sample_af[sample_idx] genotypes[:, sample_idx, :] = rng.binomial(1, probs[:, None], size=(n_snps, 2)) if missing_rate > 0: missing = rng.random(genotypes.shape) < missing_rate genotypes[missing] = -1 refs = np.asarray(["A", "C", "G", "T"], dtype=object) alts = np.asarray(["G", "T", "A", "C"], dtype=object) return SNPObject( genotypes=genotypes, samples=samples.astype(object), sample_fid=labels.astype(object), sample_sex=np.asarray([(idx % 2) + 1 for idx in range(n_samples)], dtype=np.int8), variants_ref=np.resize(refs, n_snps), variants_alt=np.resize(alts, n_snps), variants_chrom=np.repeat(str(chromosome), n_snps).astype(object), variants_id=variant_ids.astype(object), variants_pos=variant_positions.astype(np.int64, copy=False), ) def _build_lai_for_snpobject( snpobj: SNPObject, labels: np.ndarray, ancestry_map: dict[str, str], seed: int | None, *, lai: np.ndarray | None = None, ) -> LocalAncestryObject: rng = np.random.default_rng(seed) n_snps = snpobj.n_snps n_samples = snpobj.n_samples n_ancestries = len(ancestry_map) ancestry_names = list(ancestry_map.values()) label_to_ancestry = { "AFR_ref": ancestry_names.index("AFR") if "AFR" in ancestry_names else 0, "EUR_ref": ancestry_names.index("EUR") if "EUR" in ancestry_names else min(1, n_ancestries - 1), "EAS_ref": ancestry_names.index("EAS") if "EAS" in ancestry_names else min(2, n_ancestries - 1), } if lai is None: lai = np.empty((n_snps, n_samples * 2), dtype=np.int8) for sample_idx, label in enumerate(labels.astype(str).tolist()): if label in label_to_ancestry: state = label_to_ancestry[label] lai[:, 2 * sample_idx : 2 * sample_idx + 2] = state else: props = rng.dirichlet(np.ones(n_ancestries)) states = rng.choice(n_ancestries, size=(n_snps, 2), p=props) lai[:, 2 * sample_idx : 2 * sample_idx + 2] = states else: lai = np.asarray(lai, dtype=np.int8, order="C") if lai.shape != (n_snps, n_samples * 2): raise ValueError( f"Provided lai has shape {lai.shape}, expected {(n_snps, n_samples * 2)}." ) if np.any((lai < 0) | (lai >= n_ancestries)): raise ValueError("Provided lai contains ancestry codes outside ancestry_map.") physical_pos = np.column_stack([snpobj.variants_pos, snpobj.variants_pos + 999]) laiobj = LocalAncestryObject( haplotypes=[f"{sample}.{phase}" for sample in snpobj.samples.astype(str) for phase in (0, 1)], lai=lai, samples=snpobj.samples.astype(str).tolist(), ancestry_map=ancestry_map, chromosomes=snpobj.variants_chrom.copy(), physical_pos=physical_pos.astype(np.int64, copy=False), window_sizes=np.ones(n_snps, dtype=np.int64), ) snpobj.calldata_lai = lai.reshape(n_snps, n_samples, 2) snpobj.ancestry_map = ancestry_map return laiobj def _sample_lai_matrix_for_labels( labels: np.ndarray, n_snps: int, ancestry_map: dict[str, str], rng: np.random.Generator, ) -> np.ndarray: ancestry_names = list(ancestry_map.values()) n_ancestries = len(ancestry_names) ancestry_index = {name: idx for idx, name in enumerate(ancestry_names)} def _one_hot(name: str) -> np.ndarray: weights = np.zeros(n_ancestries, dtype=np.float64) weights[ancestry_index.get(name, 0)] = 1.0 return weights lai = np.empty((n_snps, labels.size * 2), dtype=np.int8) labels_list = labels.astype(str).tolist() for sample_idx, label in enumerate(labels_list): if label == "AFR_ref": base_props = _one_hot("AFR") elif label == "EUR_ref": base_props = _one_hot("EUR") elif label == "EAS_ref": base_props = _one_hot("EAS") elif label == "ADMIXED_AFR_EUR": afr = ancestry_index.get("AFR", 0) eur = ancestry_index.get("EUR", 0) p_afr = float(rng.beta(2.2, 2.2)) base_props = np.zeros(n_ancestries, dtype=np.float64) base_props[afr] = p_afr base_props[eur] = 1.0 - p_afr elif label == "ADMIXED_EUR_EAS": eur = ancestry_index.get("EUR", 0) eas = ancestry_index.get("EAS", 0) p_eur = float(rng.beta(2.2, 2.2)) base_props = np.zeros(n_ancestries, dtype=np.float64) base_props[eur] = p_eur base_props[eas] = 1.0 - p_eur else: base_props = rng.dirichlet(np.ones(n_ancestries, dtype=np.float64)) for hap in range(2): col = 2 * sample_idx + hap current = int(rng.choice(n_ancestries, p=base_props)) for snp_idx in range(n_snps): lai[snp_idx, col] = current if rng.random() < (1.0 / 35.0): current = int(rng.choice(n_ancestries, p=base_props)) return lai
[docs] def build_synthetic_mdpca_dataset( n_samples: int = 200, n_snps: int = 1_000, seed: int | None = 0, *, ancestry_map: dict[str, str] | None = None, ) -> dict[str, object]: """Build one-array SNP, LAI, and labels inputs for mdPCA examples. Notes: - This generator is intended for missing-data mdPCA tutorials. - Genotypes are generated from sample-level label structure, while LAI is generated separately. In admixed labels, haplotype ancestry states are not used to drive ancestry-specific allele frequencies. - For ancestry-specific masking demos, prefer :func:`build_synthetic_maasmds_dataset`, which couples haplotype genotypes to local ancestry states. """ if n_samples < 4: raise ValueError("n_samples must be at least 4 for mdPCA/maasMDS examples.") if n_snps <= 0: raise ValueError("n_snps must be positive.") ancestry_map = dict(DEFAULT_SYNTHETIC_ANCESTRY_MAP if ancestry_map is None else ancestry_map) labels_cycle = ["AFR_ref", "EUR_ref", "EAS_ref", "ADMIXED_AFR_EUR", "ADMIXED_EUR_EAS"] labels = _balanced_labels(n_samples, labels_cycle) samples = np.asarray([f"M{i + 1:04d}" for i in range(n_samples)], dtype=object) variant_positions = np.arange(1, n_snps + 1, dtype=np.int64) * 1_000 variant_ids = np.asarray([f"rs_mdpca_{idx + 1:06d}" for idx in range(n_snps)], dtype=object) snpobj = _make_structured_snpobject( samples=samples, labels=labels, variant_ids=variant_ids, variant_positions=variant_positions, seed=seed, missing_rate=0.01, ) labels_df = pd.DataFrame({"indID": snpobj.samples.astype(str), "label": labels}) laiobj = _build_lai_for_snpobject(snpobj, labels, ancestry_map, seed) return { "snpobj": snpobj, "laiobj": laiobj, "labels": labels_df, "ancestry_map": ancestry_map, }
[docs] def build_synthetic_maasmds_dataset( n_samples_per_array: int = 200, n_snps_per_array: int = 1_000, n_arrays: int = 3, seed: int | None = 0, *, ancestry_map: dict[str, str] | None = None, triple_shared_fraction: float = 0.25, pair_shared_fraction: float = 0.20, ) -> dict[str, object]: """Build multi-array SNP, LAI, and labels inputs for maasMDS examples. With the default three arrays and 1,000 SNPs per array, each array has 250 SNPs shared across all arrays, 200 SNPs shared with each other array, and 350 array-specific SNPs. Thus overlap decays from within-array to pairwise intersections to the three-way intersection. Genotypes are sampled haplotype-by-haplotype from local ancestry states. Within each continental ancestry we simulate multiple latent sources so ancestry-masked analyses still contain population-specific structure. """ if n_arrays != 3: raise ValueError("Only n_arrays=3 is currently supported.") if n_samples_per_array <= 0: raise ValueError("n_samples_per_array must be positive.") if n_snps_per_array <= 0: raise ValueError("n_snps_per_array must be positive.") ancestry_map = dict(DEFAULT_SYNTHETIC_ANCESTRY_MAP if ancestry_map is None else ancestry_map) n_all = int(round(n_snps_per_array * triple_shared_fraction)) n_pair = int(round(n_snps_per_array * pair_shared_fraction)) n_unique = n_snps_per_array - n_all - 2 * n_pair if n_all <= 0 or n_pair <= 0 or n_unique <= 0: raise ValueError( "Overlap fractions must leave positive all-shared, pair-shared, and unique SNP counts." ) next_pos = 1 def alloc(prefix: str, count: int) -> tuple[np.ndarray, np.ndarray]: nonlocal next_pos positions = np.arange(next_pos, next_pos + count, dtype=np.int64) * 1_000 ids = np.asarray([f"{prefix}_{idx + 1:06d}" for idx in range(count)], dtype=object) next_pos += count return ids, positions all_ids, all_pos = alloc("rs_all_arrays", n_all) pair_defs = {(0, 1): alloc("rs_array_1_2", n_pair), (0, 2): alloc("rs_array_1_3", n_pair), (1, 2): alloc("rs_array_2_3", n_pair)} unique_defs = [alloc(f"rs_array_{idx + 1}_only", n_unique) for idx in range(n_arrays)] labels_cycle = ["AFR_ref", "EUR_ref", "EAS_ref", "ADMIXED_AFR_EUR", "ADMIXED_EUR_EAS"] rng = np.random.default_rng(seed) all_variant_positions = np.concatenate( [all_pos] + [pos for _, pos in pair_defs.values()] + [pos for _, pos in unique_defs] ) ancestry_names = [str(name) for name in ancestry_map.values()] fallback_centroid = np.mean( np.vstack([_POPULATION_COORDS["AFR_ref"], _POPULATION_COORDS["EUR_ref"], _POPULATION_COORDS["EAS_ref"]]), axis=0, ) ancestry_centroids = { "AFR": _POPULATION_COORDS["AFR_ref"], "EUR": _POPULATION_COORDS["EUR_ref"], "EAS": _POPULATION_COORDS["EAS_ref"], } n_sources_per_ancestry = 3 source_offsets = np.asarray( [ [0.18, 0.00], [-0.14, 0.12], [-0.07, -0.16], ], dtype=np.float64, ) source_coord_rows = [] source_row_lookup: dict[tuple[str, int], int] = {} for anc in ancestry_names: centroid = np.asarray(ancestry_centroids.get(anc, fallback_centroid), dtype=np.float64) coords = centroid[None, :] + source_offsets for src_idx in range(n_sources_per_ancestry): source_row_lookup[(anc, src_idx)] = len(source_coord_rows) source_coord_rows.append(coords[src_idx]) af_by_source_matrix = _allele_probabilities_from_coordinates( np.vstack(source_coord_rows), len(all_variant_positions), rng, load_scale=1.25, noise_scale=0.02, ) af_by_ancestry_source_and_pos = { anc: { src_idx: { int(pos): float(af_by_source_matrix[source_row_lookup[(anc, src_idx)], pos_idx]) for pos_idx, pos in enumerate(all_variant_positions.tolist()) } for src_idx in range(n_sources_per_ancestry) } for anc in ancestry_names } source_mean_by_ancestry = { anc: np.mean( np.stack( [ np.asarray( [af_by_ancestry_source_and_pos[anc][src_idx][int(pos)] for pos in all_variant_positions.tolist()], dtype=np.float64, ) for src_idx in range(n_sources_per_ancestry) ], axis=0, ), axis=0, ) for anc in ancestry_names } ancestry_mean_stack = np.stack([source_mean_by_ancestry[anc] for anc in ancestry_names], axis=0) global_maf = np.mean(ancestry_mean_stack, axis=0) ancestry_informativeness = np.max(ancestry_mean_stack, axis=0) - np.min(ancestry_mean_stack, axis=0) snpobjs = [] laiobjs = [] labels_frames = [] for array_idx in range(n_arrays): samples = np.asarray( [f"array{array_idx + 1}_sample{i + 1:04d}" for i in range(n_samples_per_array)], dtype=object, ) labels = _balanced_labels(n_samples_per_array, labels_cycle) ids_parts = [all_ids] pos_parts = [all_pos] for pair, (ids, pos) in pair_defs.items(): if array_idx in pair: ids_parts.append(ids) pos_parts.append(pos) ids_parts.append(unique_defs[array_idx][0]) pos_parts.append(unique_defs[array_idx][1]) variant_ids = np.concatenate(ids_parts) variant_positions = np.concatenate(pos_parts) order = np.argsort(variant_positions) variant_ids = variant_ids[order] variant_positions = variant_positions[order] n_array_snps = len(variant_ids) genotypes = np.empty((n_array_snps, n_samples_per_array, 2), dtype=np.int8) array_rng = np.random.default_rng(None if seed is None else seed + array_idx + 10) lai = _sample_lai_matrix_for_labels(labels, n_array_snps, ancestry_map, array_rng) anc_name_by_code = {idx: str(name) for idx, name in enumerate(ancestry_map.values())} pos_to_global_index = {int(pos): idx for idx, pos in enumerate(all_variant_positions.tolist())} snp_global_index = np.asarray([pos_to_global_index[int(pos)] for pos in variant_positions.tolist()], dtype=np.int64) array_maf = global_maf[snp_global_index] array_aim = ancestry_informativeness[snp_global_index] if array_idx == 0: ascertainment_delta = -0.55 * (array_maf - np.mean(array_maf)) elif array_idx == 1: ascertainment_delta = 0.55 * (array_maf - np.mean(array_maf)) else: ascertainment_delta = 0.95 * (array_aim - np.mean(array_aim)) probs_by_ancestry_source = { anc_name: { src_idx: np.asarray( [af_by_ancestry_source_and_pos[anc_name][src_idx][int(pos)] for pos in variant_positions.tolist()], dtype=np.float64, ) for src_idx in range(n_sources_per_ancestry) } for anc_name in ancestry_names } def _source_weights_for_label_ancestry(label: str, ancestry: str) -> np.ndarray: label_map = _LABEL_ANCESTRY_SOURCE_WEIGHTS.get(label, {}) if ancestry in label_map: base = np.asarray(label_map[ancestry], dtype=np.float64) else: base = np.full(n_sources_per_ancestry, 1.0 / n_sources_per_ancestry, dtype=np.float64) draw = array_rng.dirichlet(24.0 * base + 0.3) return draw / np.sum(draw) for sample_idx in range(n_samples_per_array): label = str(labels[sample_idx]) source_by_ancestry: dict[str, int] = {} for anc_name in ancestry_names: weights = _source_weights_for_label_ancestry(label, anc_name) source_by_ancestry[anc_name] = int(array_rng.choice(n_sources_per_ancestry, p=weights)) for hap in range(2): codes = lai[:, 2 * sample_idx + hap] hap_probs = np.empty(n_array_snps, dtype=np.float64) for code, anc_name in anc_name_by_code.items(): mask = codes == code if np.any(mask): source_idx = source_by_ancestry[anc_name] hap_probs[mask] = probs_by_ancestry_source[anc_name][source_idx][mask] adjusted_probs = np.clip(hap_probs + ascertainment_delta, 0.01, 0.99) genotypes[:, sample_idx, hap] = array_rng.binomial(1, adjusted_probs) missing = array_rng.random(genotypes.shape) < 0.01 genotypes[missing] = -1 refs = np.asarray(["A", "C", "G", "T"], dtype=object) alts = np.asarray(["G", "T", "A", "C"], dtype=object) snpobj = SNPObject( genotypes=genotypes, samples=samples.astype(object), sample_fid=labels.astype(object), sample_sex=np.asarray([(idx % 2) + 1 for idx in range(n_samples_per_array)], dtype=np.int8), variants_ref=np.resize(refs, len(variant_ids)), variants_alt=np.resize(alts, len(variant_ids)), variants_chrom=np.repeat("1", len(variant_ids)).astype(object), variants_id=variant_ids.astype(object), variants_pos=variant_positions.astype(np.int64, copy=False), ) laiobj = _build_lai_for_snpobject( snpobj, labels, ancestry_map, None if seed is None else seed + array_idx + 100, lai=lai, ) snpobjs.append(snpobj) laiobjs.append(laiobj) labels_frames.append(pd.DataFrame({"indID": samples.astype(str), "label": labels, "array": array_idx + 1})) variant_sets = [set(map(int, snp.variants_pos.tolist())) for snp in snpobjs] overlap_counts = { "array_1": len(variant_sets[0]), "array_2": len(variant_sets[1]), "array_3": len(variant_sets[2]), "array_1_2": len(variant_sets[0] & variant_sets[1]), "array_1_3": len(variant_sets[0] & variant_sets[2]), "array_2_3": len(variant_sets[1] & variant_sets[2]), "array_1_2_3": len(variant_sets[0] & variant_sets[1] & variant_sets[2]), } return { "snpobjs": snpobjs, "laiobjs": laiobjs, "labels": pd.concat(labels_frames, ignore_index=True), "ancestry_map": ancestry_map, "overlap_counts": overlap_counts, }
[docs] def build_synthetic_chromosome_painting_dataset( n_samples: int = 3, windows_per_chromosome: int = 60, seed: int | None = 42, *, build: str = "hg38", chromosomes: Sequence[str] | None = None, ancestry_map: dict[str, str] | None = None, male_samples: Sequence[str] | None = None, ) -> dict[str, object]: """Build synthetic LAI data for chromosome painting examples. By default this simulates diploid local ancestry across chromosomes ``1``-``22`` and ``X``. The default sample metadata marks every sample as female so the reported sex is consistent with the diploid-X simulation used for painting. """ if n_samples <= 0: raise ValueError("n_samples must be positive.") if windows_per_chromosome <= 0: raise ValueError("windows_per_chromosome must be positive.") if build not in CHROM_SIZES: raise ValueError(f"Unknown build {build!r}. Available builds: {sorted(CHROM_SIZES)}") ancestry_map = dict(DEFAULT_SYNTHETIC_ANCESTRY_MAP if ancestry_map is None else ancestry_map) supported_chroms = {str(chrom) for chrom in range(1, 23)} | {"X"} requested_chroms = [str(c) for c in (chromosomes or [*range(1, 23), "X"])] chroms = [chrom for chrom in requested_chroms if chrom in supported_chroms] if not chroms: raise ValueError( "No supported chromosomes selected. Pass chromosomes from '1' to '22' and/or 'X'." ) rng = np.random.default_rng(seed) samples = [f"sample{i}" for i in range(n_samples)] if male_samples is None: male_samples = [] male_samples = set(map(str, male_samples)) n_windows = len(chroms) * windows_per_chromosome n_haplotypes = n_samples * 2 chrom_array = np.repeat(np.asarray(chroms, dtype=object), windows_per_chromosome) physical_pos = np.zeros((n_windows, 2), dtype=np.int64) centimorgan_pos = np.zeros((n_windows, 2), dtype=np.float64) window_sizes = np.full(n_windows, 10, dtype=np.int32) for chrom in chroms: chrom_size = CHROM_SIZES[build][chrom] win_size = chrom_size // windows_per_chromosome idx = np.where(chrom_array == chrom)[0] for local_i, global_i in enumerate(idx): start = local_i * win_size end = start + win_size if local_i < windows_per_chromosome - 1 else chrom_size physical_pos[global_i] = [start, end] centimorgan_pos[global_i] = [start / 1e6, end / 1e6] lai = np.zeros((n_windows, n_haplotypes), dtype=np.int32) n_ancestries = len(ancestry_map) for hap in range(n_haplotypes): for chrom_idx in range(len(chroms)): start = chrom_idx * windows_per_chromosome stop = start + windows_per_chromosome pos = start while pos < stop: block_len = int(rng.integers(4, 20)) lai[pos : min(pos + block_len, stop), hap] = int(rng.integers(0, n_ancestries)) pos += block_len laiobj = LocalAncestryObject( haplotypes=[f"{sample}.{phase}" for sample in samples for phase in (0, 1)], lai=lai, samples=samples, ancestry_map=ancestry_map, chromosomes=chrom_array, physical_pos=physical_pos, centimorgan_pos=centimorgan_pos, window_sizes=window_sizes, ) sample_sex = pd.DataFrame( { "sample": samples, "sex": ["male" if sample in male_samples else "female" for sample in samples], } ) return { "laiobj": laiobj, "sample_sex": sample_sex, "build": build, "chromosomes": chroms, "ancestry_map": ancestry_map, }
[docs] def build_synthetic_grg() -> GRGObject: """Build a tiny deterministic GRGObject.""" try: import pygrgl as pyg from snputils.snp.genobj.grgobj import GRGObject except ModuleNotFoundError as exc: if exc.name == "pygrgl": raise ImportError( "GRG support requires the optional dependency 'pygrgl'. " "Install pygrgl separately: " "https://github.com/aprilweilab/grgl#installing-from-pip" ) from exc raise grg = pyg.MutableGRG(6, 2, True) root = grg.make_node() left = grg.make_node() right = grg.make_node() grg.connect(root, left) grg.connect(root, right) for sample in [0, 1, 2]: grg.connect(left, sample) for sample in [3, 4, 5]: grg.connect(right, sample) grg.add_mutation(pyg.Mutation(100.0, "G", "A", 0.0), root) grg.add_mutation(pyg.Mutation(110.0, "T", "C", 0.0), left) grg.add_mutation(pyg.Mutation(120.0, "C", "G", 0.0), right) grg.add_mutation(pyg.Mutation(130.0, "A", "T", 0.0), 0) grg.add_mutation(pyg.Mutation(140.0, "G", "T", 0.0), 5) return GRGObject(genotypes=grg, mutable=True)