Source code for snputils.tools.admixture_mapping

import argparse
import csv
import logging
from pathlib import Path
from typing import Dict, List, Optional, Sequence, Set, Tuple, Union

import numpy as np
import pandas as pd
from snputils.ancestry.genobj.local import LocalAncestryObject
from snputils.ancestry.io.local.read import LAIReader
from snputils.phenotype.genobj import CovariateObject, PhenotypeObject
from snputils.phenotype.io.read import PhenotypeReader

from ._association import (
    _apply_multiple_testing_adjustment,
    _compute_effective_chunk_size,
    _compute_linear_ci_beta,
    _compute_logistic_ci_or,
    _compute_multiple_testing_adjustments,
    _coerce_covar_source,
    _confidence_interval_label,
    _enforce_memory_budget,
    _fit_linear_batch,
    _fit_linear_batch_with_covariates,
    _fit_logistic_batch,
    _fit_logistic_batch_with_covariates,
    _get_process_rss_mb,
    _normalize_chromosome,
    _odds_ratio_batch,
    _open_tsv_for_write,
    _prepare_fwl,
    _read_sample_list,
    _resolve_output_path,
)

log = logging.getLogger(__name__)

HLA_START = 25_477_797
HLA_END = 36_448_354


def _coerce_phenotype_source(
    phe_source: Union[str, Path, PhenotypeObject],
    *,
    phe_id: Optional[str],
    quantitative: Optional[bool],
) -> Tuple[PhenotypeObject, str]:
    if isinstance(phe_source, PhenotypeObject):
        phenotype_obj = (
            phe_source
            if quantitative is None
            else PhenotypeObject(
                samples=phe_source.samples,
                values=phe_source.values,
                phenotype_name=phe_source.phenotype_name,
                quantitative=quantitative,
            )
        )
        return phenotype_obj, str(phe_id or phenotype_obj.phenotype_name)

    if phe_id is None:
        raise TypeError(
            "run_admixture_mapping() missing required argument: 'phe_id' when phenotype input is a path."
        )
    phenotype_obj = PhenotypeReader(phe_source).read(
        phenotype_col=phe_id,
        quantitative=quantitative,
    )
    return phenotype_obj, str(phe_id)


def add_admixmap_arguments(parser: argparse.ArgumentParser) -> None:
    parser.add_argument(
        "--batch-size",
        dest="batch_size",
        required=False,
        default=32768,
        type=int,
        help="Maximum number of windows processed per chunk while building ancestry dosages.",
    )
    parser.add_argument(
        "--memory",
        dest="memory",
        required=False,
        default=None,
        type=int,
        help="Peak RSS-delta memory cap in MiB for internal chunked processing.",
    )
    parser.add_argument(
        "--keep-hla",
        dest="keep_hla",
        required=False,
        action="store_true",
        help="Keep chr6 HLA windows (default is to remove them).",
    )
    parser.add_argument(
        "--quantitative",
        dest="quantitative",
        required=False,
        action="store_true",
        default=None,
        help="Optional override to force quantitative (linear) mode. Default is automatic trait detection.",
    )
    parser.add_argument(
        "--verbose",
        dest="verbose",
        required=False,
        action="store_true",
        help="Print progress (windows processed, elapsed time, rate) during admixture mapping.",
    )
    parser.add_argument(
        "--covar-path",
        dest="covar_path",
        required=False,
        type=str,
        default=None,
        help="Path to covariate file (whitespace-delimited, header with #FID IID or #IID plus covariate columns).",
    )
    parser.add_argument(
        "--covar-col-nums",
        dest="covar_col_nums",
        required=False,
        type=str,
        default=None,
        help='Covariate column numbers relative to first covariate column (e.g. "1-5,7").',
    )
    parser.add_argument(
        "--covar-variance-standardize",
        dest="covar_variance_standardize",
        required=False,
        action="store_true",
        help="Center and variance-standardize selected covariates.",
    )
    parser.add_argument(
        "--ci",
        dest="ci",
        required=False,
        type=float,
        default=None,
        help="Confidence level in (0, 1), e.g. 0.95 for L95/U95 columns.",
    )
    parser.add_argument(
        "--adjust",
        dest="adjust",
        required=False,
        action="store_true",
        help="Add Bonferroni and Benjamini-Hochberg FDR adjusted p-values.",
    )
    parser.add_argument(
        "--keep-path",
        dest="keep_path",
        required=False,
        type=str,
        default=None,
        help="Path to keep file (FID IID or IID per line) for sample inclusion.",
    )
    parser.add_argument(
        "--sample-remove",
        dest="remove_path",
        required=False,
        type=str,
        default=None,
        help="Path to remove file (FID IID or IID per line) for sample exclusion.",
    )
    parser.add_argument(
        "--results-path",
        dest="results_path",
        required=False,
        type=str,
        default="admixmap.tsv.gz",
        help="Path used to save resulting data in compressed .tsv file (default: admixmap.tsv.gz).",
    )
    parser.add_argument(
        "--manhattan-plot",
        dest="manhattan_plot",
        required=False,
        default=None,
        type=str,
        help="Optional path to save a Manhattan plot after admixture mapping completes (.pdf / .svg / .png, ...).",
    )
    parser.add_argument(
        "--qq-plot",
        dest="qq_plot",
        required=False,
        default=None,
        type=str,
        help="Optional path to save a Q-Q plot after admixture mapping completes (.pdf / .svg / .png, ...).",
    )
    required_argv = parser.add_argument_group("required arguments")
    required_argv.add_argument(
        "--phe-id",
        dest="phe_id",
        required=True,
        type=str,
        help="Phenotype ID / column name to analyze.",
    )
    required_argv.add_argument(
        "--phe-path",
        dest="phe_path",
        required=True,
        type=str,
        help="Path to phenotype file (headered text with IID column and one or more phenotype columns; e.g. .txt, .phe, .pheno).",
    )
    required_argv.add_argument(
        "--lai-path",
        dest="lai_path",
        required=True,
        type=str,
        help="Path of the local ancestry file (.msp/.msp.tsv or FLARE .anc.vcf.gz).",
    )


def parse_admixmap_args(argv: Sequence[str]) -> argparse.Namespace:
    parser = argparse.ArgumentParser(prog="admixture-map", description="Admixture Mapping.")
    add_admixmap_arguments(parser)
    return parser.parse_args(argv)


def _chromosome_as_int(chromosome: object) -> Optional[int]:
    text_lower = str(chromosome).strip().lower().replace("chr", "")
    return int(text_lower) if text_lower.isdigit() else None


def _remove_hla_windows(lai_obj) -> int:
    if lai_obj.chromosomes is None or lai_obj.physical_pos is None:
        return 0

    indexes_to_remove: List[int] = []
    for i, chrom in enumerate(lai_obj.chromosomes):
        chrom_int = _chromosome_as_int(chrom)
        if chrom_int != 6:
            continue
        start_pos = int(lai_obj.physical_pos[i, 0])
        end_pos = int(lai_obj.physical_pos[i, 1])
        if (HLA_START <= start_pos <= HLA_END) or (HLA_START <= end_pos <= HLA_END):
            indexes_to_remove.append(i)

    if indexes_to_remove:
        lai_obj.filter_windows(indexes=np.asarray(indexes_to_remove, dtype=int), include=False, inplace=True)
    return len(indexes_to_remove)


def _resolve_ancestries(lai_obj) -> List[Tuple[int, str]]:
    if lai_obj.ancestry_map is None:
        return [(int(code), f"ANC{int(code)}") for code in sorted(np.unique(lai_obj.lai).astype(int))]
    pairs = []
    for code_str, label in lai_obj.ancestry_map.items():
        pairs.append((int(code_str), str(label)))
    return sorted(pairs, key=lambda x: x[0])


def _resolve_ancestries_from_metadata(
    lai_reader,
    ancestry_map: Optional[Dict[str, str]],
    chunk_size: int,
    sample_indices: Optional[np.ndarray] = None,
) -> List[Tuple[int, str]]:
    if ancestry_map is not None:
        pairs = [(int(code), str(label)) for code, label in ancestry_map.items()]
        return sorted(pairs, key=lambda x: x[0])

    unique_codes: set[int] = set()
    for chunk in lai_reader.iter_windows(chunk_size=chunk_size, sample_indices=sample_indices):
        unique_codes.update(int(code) for code in np.unique(chunk["lai"]))
    return [(code, f"ANC{code}") for code in sorted(unique_codes)]


def _iter_laiobj_windows(
    laiobj: LocalAncestryObject,
    chunk_size: int,
    sample_indices: Optional[np.ndarray] = None,
):
    lai = np.asarray(laiobj.lai)
    if lai.ndim != 2:
        raise ValueError("LocalAncestryObject.lai must be a 2D array.")
    if lai.shape[1] % 2 != 0:
        raise ValueError("LocalAncestryObject.lai must contain an even number of haplotypes.")

    n_windows = int(lai.shape[0])
    n_samples = int(lai.shape[1] // 2)

    if sample_indices is None:
        hap_indices = None
    else:
        sample_indices = np.asarray(sample_indices, dtype=np.int64)
        if sample_indices.size == 0:
            raise ValueError("sample_indices cannot be empty.")
        if np.any(sample_indices < 0) or np.any(sample_indices >= n_samples):
            raise ValueError("sample_indices contain out-of-bounds sample indexes.")
        if sample_indices.size == n_samples and np.array_equal(sample_indices, np.arange(n_samples)):
            hap_indices = None
        else:
            hap_indices = np.empty(sample_indices.size * 2, dtype=np.int64)
            hap_indices[0::2] = 2 * sample_indices
            hap_indices[1::2] = 2 * sample_indices + 1

    chromosomes = (
        np.asarray(laiobj.chromosomes)
        if laiobj.chromosomes is not None
        else np.full(n_windows, ".", dtype=object)
    )
    physical_pos = (
        np.asarray(laiobj.physical_pos)
        if laiobj.physical_pos is not None
        else None
    )

    for start in range(0, n_windows, int(chunk_size)):
        stop = min(start + int(chunk_size), n_windows)
        chunk_lai = lai[start:stop] if hap_indices is None else lai[start:stop, :][:, hap_indices]
        yield {
            "window_indexes": np.arange(start, stop, dtype=np.int64),
            "chromosomes": chromosomes[start:stop],
            "physical_pos": physical_pos[start:stop] if physical_pos is not None else None,
            "lai": chunk_lai,
        }


def _align_samples(
    lai_samples: Sequence[str],
    phe_samples: Sequence[str],
    y: np.ndarray,
    quantitative: bool = False,
    keep_ids: Optional[Set[str]] = None,
    remove_ids: Optional[Set[str]] = None,
    covar_samples: Optional[Sequence[str]] = None,
    covar_matrix: Optional[np.ndarray] = None,
) -> Tuple[np.ndarray, np.ndarray, List[str], Optional[np.ndarray]]:
    sample_to_idx = {sample_id: idx for idx, sample_id in enumerate(lai_samples)}
    covar_to_idx: Optional[Dict[str, int]] = None
    if covar_samples is not None:
        if covar_matrix is None:
            raise ValueError("Internal error: covariate samples provided without covariate matrix.")
        covar_to_idx = {sample_id: idx for idx, sample_id in enumerate(covar_samples)}

    lai_indexes: List[int] = []
    y_aligned: list = []
    covar_aligned: List[np.ndarray] = []
    aligned_samples: List[str] = []

    for sid, yi in zip(phe_samples, y):
        if keep_ids is not None and sid not in keep_ids:
            continue
        if remove_ids is not None and sid in remove_ids:
            continue

        idx = sample_to_idx.get(sid)
        if idx is None:
            continue
        if covar_to_idx is not None:
            cov_idx = covar_to_idx.get(sid)
            if cov_idx is None:
                continue
        lai_indexes.append(idx)
        y_aligned.append(float(yi) if quantitative else int(yi))
        if covar_to_idx is not None and covar_matrix is not None:
            covar_aligned.append(covar_matrix[cov_idx].astype(np.float64, copy=False))
        aligned_samples.append(sid)

    if not lai_indexes:
        raise ValueError("No overlapping samples between phenotype and LAI source.")

    if quantitative:
        y_arr = np.asarray(y_aligned, dtype=np.float64)
        if np.var(y_arr) <= 0.0:
            raise ValueError("Quantitative phenotype has zero variance after LAI/PHE sample intersection.")
    else:
        y_arr = np.asarray(y_aligned, dtype=np.int8)
        if int(np.sum(y_arr)) == 0:
            raise ValueError("No cases after LAI/PHE sample intersection.")
        if int(np.sum(y_arr)) == len(y_arr):
            raise ValueError("No controls after LAI/PHE sample intersection.")

    covar_out = (
        np.asarray(covar_aligned, dtype=np.float64)
        if covar_to_idx is not None
        else None
    )
    return np.asarray(lai_indexes, dtype=np.int64), y_arr, aligned_samples, covar_out


def _compute_group_counts_from_lai(
    maternal: np.ndarray,
    paternal: np.ndarray,
    ancestry_code: int,
    y_binary: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray]:
    """Compute group (dosage-bin) counts directly from haplotype arrays.

    Avoids materializing the full dosage array by computing counts directly
    from the maternal/paternal match indicators.
    """
    n_samples = maternal.shape[1]
    cases_total = int(np.sum(y_binary))

    m = maternal == ancestry_code
    p = paternal == ancestry_code

    y_int = y_binary.astype(np.int64, copy=False)

    sum_m = np.sum(m, axis=1, dtype=np.int64)
    sum_p = np.sum(p, axis=1, dtype=np.int64)

    both = m & p
    n2 = np.sum(both, axis=1, dtype=np.int64)
    c2 = both @ y_int
    del both

    n1 = sum_m + sum_p - 2 * n2
    n0 = n_samples - n1 - n2

    cm = m @ y_int
    cp = p @ y_int
    c1 = cm + cp - 2 * c2
    c0 = cases_total - c1 - c2

    n_counts = np.stack([n0, n1, n2], axis=1)
    c_counts = np.stack([c0, c1, c2], axis=1)
    return n_counts.astype(np.float64), c_counts.astype(np.float64)


def _compute_dosage_from_lai(
    maternal: np.ndarray,
    paternal: np.ndarray,
    ancestry_code: int,
) -> np.ndarray:
    """Materialize per-sample ancestry dosage from maternal/paternal LAI haplotypes."""
    return (
        (maternal == ancestry_code).astype(np.uint8)
        + (paternal == ancestry_code).astype(np.uint8)
    )


def _compute_linear_stats_from_lai(
    maternal: np.ndarray,
    paternal: np.ndarray,
    ancestry_code: int,
    y: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Compute sufficient statistics for OLS on dosage groups directly from haplotype arrays.

    For each window and dosage group k in {0, 1, 2}, computes:
      - n[w, k]:      count of samples with dosage k
      - sum_y[w, k]:  sum of y for those samples
      - sum_y2[w, k]: sum of y^2 for those samples

    These three arrays are sufficient to perform closed-form OLS.
    """
    n_samples = maternal.shape[1]

    m = maternal == ancestry_code
    p = paternal == ancestry_code

    both = m & p
    n2 = np.sum(both, axis=1, dtype=np.int64)

    sum_m = np.sum(m, axis=1, dtype=np.int64)
    sum_p = np.sum(p, axis=1, dtype=np.int64)
    n1 = sum_m + sum_p - 2 * n2
    n0 = n_samples - n1 - n2

    y_f64 = y.astype(np.float64, copy=False)
    y_sq = y_f64 * y_f64

    sum_y_total = float(np.sum(y_f64))
    sum_y2_total = float(np.sum(y_sq))

    # dosage=2: both maternal and paternal match
    sy2 = both.astype(np.float64) @ y_f64
    sy2_sq = both.astype(np.float64) @ y_sq
    del both

    # dosage>=1 via maternal or paternal match
    sy_m = m.astype(np.float64) @ y_f64
    sy_p = p.astype(np.float64) @ y_f64
    sy_m_sq = m.astype(np.float64) @ y_sq
    sy_p_sq = p.astype(np.float64) @ y_sq

    # dosage=1: exactly one haplotype matches
    sy1 = sy_m + sy_p - 2.0 * sy2
    sy1_sq = sy_m_sq + sy_p_sq - 2.0 * sy2_sq

    # dosage=0: the remainder
    sy0 = sum_y_total - sy1 - sy2
    sy0_sq = sum_y2_total - sy1_sq - sy2_sq

    n_counts = np.stack([n0, n1, n2], axis=1).astype(np.float64)
    sum_y = np.stack([sy0, sy1, sy2], axis=1)
    sum_y2 = np.stack([sy0_sq, sy1_sq, sy2_sq], axis=1)
    return n_counts, sum_y, sum_y2


[docs] def run_admixture_mapping( phe_path: Union[str, Path, PhenotypeObject], lai_source: Optional[Union[str, Path, LocalAncestryObject]] = None, results_path: Union[str, Path] = "admixmap.tsv.gz", phe_id: Optional[str] = None, batch_size: int = 256, keep_hla: bool = False, memory: Optional[int] = None, return_results: bool = True, quantitative: Optional[bool] = None, verbose: bool = False, covar: Optional[Union[str, Path, CovariateObject]] = None, covar_path: Optional[Union[str, Path]] = None, covar_col_nums: Optional[str] = None, covar_variance_standardize: bool = False, ci: Optional[float] = None, adjust: bool = False, keep_path: Optional[Union[str, Path]] = None, remove_path: Optional[Union[str, Path]] = None, ) -> pd.DataFrame: """Run window-level admixture mapping from a local ancestry file or LAI object. Args: phe_path: Phenotype file path or in-memory :class:`PhenotypeObject`. lai_source: Local ancestry source. Pass an MSP/FLARE file path or an in-memory :class:`LocalAncestryObject`. results_path: Output TSV path or directory (default: admixmap.tsv.gz). phe_id: Phenotype column name to analyze. Required when ``phe_path`` is a file path; inferred from ``PhenotypeObject.phenotype_name`` for in-memory phenotype input. covar: Covariate file path or in-memory :class:`CovariateObject`. ``covar_path`` is retained as a backward-compatible alias. """ if covar is not None and covar_path is not None: raise TypeError("Pass only one of `covar` or `covar_path`.") covar_source = covar if covar is not None else covar_path if lai_source is None: raise TypeError("run_admixture_mapping() missing required argument: 'lai_source'") if memory is not None and int(memory) < 2: raise MemoryError("--memory must be >= 2 MiB for internal admixture mapping.") if ci is not None and (ci <= 0.0 or ci >= 1.0): raise ValueError("--ci must be in the open interval (0, 1).") phenotype_obj, output_phe_id = _coerce_phenotype_source( phe_path, phe_id=phe_id, quantitative=quantitative, ) phe_samples = phenotype_obj.samples y = phenotype_obj.values trait_is_quantitative = bool(phenotype_obj.is_quantitative) keep_ids = _read_sample_list(keep_path) if keep_path is not None else None remove_ids = _read_sample_list(remove_path) if remove_path is not None else None covar_samples, _covar_names, covar_matrix = _coerce_covar_source( covar_source, col_nums=covar_col_nums, variance_standardize=covar_variance_standardize, ) if isinstance(lai_source, LocalAncestryObject): laiobj = lai_source lai_reader = None lai_samples = ( list(laiobj.samples) if laiobj.samples is not None else [str(hap)[:-2] for hap in laiobj.haplotypes[0::2]] ) ancestry_map = laiobj.ancestry_map else: laiobj = None lai_reader = LAIReader(lai_source) metadata = lai_reader.read_metadata() lai_samples = metadata.samples ancestry_map = metadata.ancestry_map lai_sample_indexes, y_aligned, aligned_samples, covar_aligned = _align_samples( lai_samples, phe_samples, y, quantitative=trait_is_quantitative, keep_ids=keep_ids, remove_ids=remove_ids, covar_samples=covar_samples, covar_matrix=covar_matrix, ) del aligned_samples covariates_present = covar_aligned is not None n_covar = int(covar_aligned.shape[1]) if covariates_present else 0 chunk_size = _compute_effective_chunk_size( batch_size=batch_size, n_samples=int(lai_sample_indexes.size), memory_mib=memory, covariates_present=covariates_present, quantitative=trait_is_quantitative, ) if laiobj is not None: ancestries = _resolve_ancestries(laiobj) else: if lai_reader is None: raise ValueError("Internal error: missing LAI reader.") ancestries = _resolve_ancestries_from_metadata( lai_reader=lai_reader, ancestry_map=ancestry_map, chunk_size=chunk_size, sample_indices=lai_sample_indexes, ) if not ancestries: raise ValueError("No ancestries available in LAI source.") obs_ct = int(y_aligned.size) rss_baseline_mb = _get_process_rss_mb() if memory is not None else None output_file = _resolve_output_path(results_path, output_phe_id) ci_cols: List[str] = [] if ci is not None: ci_suffix = _confidence_interval_label(ci) ci_cols = [f"L{ci_suffix}", f"U{ci_suffix}"] if trait_is_quantitative: core_columns = [ "#CHROM", "POS", "END", "ID", "REF", "ALT", "A1", "ANCESTRY", "TEST", "OBS_CT", "BETA", "SE", "T_STAT", "P", ] else: core_columns = [ "#CHROM", "POS", "END", "ID", "REF", "ALT", "A1", "ANCESTRY", "TEST", "OBS_CT", "BETA", "OR", "LOG(OR)_SE", "Z_STAT", "P", ] columns_without_adjust = core_columns + ci_cols + ["ERRCODE"] final_columns = core_columns + ci_cols + (["BONF", "FDR_BH"] if adjust else []) + ["ERRCODE"] covar_f64: Optional[np.ndarray] = None y_resid: Optional[np.ndarray] = None q_fwl: Optional[np.ndarray] = None if trait_is_quantitative: y_f64 = y_aligned.astype(np.float64, copy=False) if covariates_present: covar_f64 = covar_aligned.astype(np.float64, copy=False) y_resid, q_fwl = _prepare_fwl(y_f64, covar_f64) else: y_binary = y_aligned.astype(np.int64, copy=False) if covariates_present: covar_f64 = covar_aligned.astype(np.float64, copy=False) records: List[Dict[str, object]] = [] if return_results else [] collected_p_values: Optional[List[float]] = [] if adjust else None kept_window_counter: Dict[int, int] = {int(code): 0 for code, _ in ancestries} removed_hla_windows_total = 0 n_windows_processed_total = 0 chunk_index = 0 try: with _open_tsv_for_write(output_file) as handle: writer = csv.writer(handle, delimiter="\t") writer.writerow(columns_without_adjust) if verbose: source_name = "LocalAncestryObject" if laiobj is not None else "LAI file" print(f"Reading LAI source ({source_name})...", flush=True) if laiobj is not None: chunk_iter = _iter_laiobj_windows( laiobj, chunk_size=chunk_size, sample_indices=lai_sample_indexes, ) else: if lai_reader is None: raise ValueError("Internal error: missing LAI reader.") chunk_iter = lai_reader.iter_windows( chunk_size=chunk_size, sample_indices=lai_sample_indexes, ) for chunk in chunk_iter: n_win_processed = 0 chunk_lai = chunk["lai"] if chunk_lai.size == 0: continue chunk_chrom = chunk["chromosomes"] chunk_phys = chunk["physical_pos"] if chunk_phys is not None: chunk_starts = chunk_phys[:, 0].astype(np.int64, copy=False) chunk_ends = chunk_phys[:, 1].astype(np.int64, copy=False) else: chunk_starts = chunk["window_indexes"].astype(np.int64, copy=False) + 1 chunk_ends = chunk_starts.copy() if not keep_hla and chunk_phys is not None: keep_mask = np.ones(chunk_lai.shape[0], dtype=bool) for i, chrom in enumerate(chunk_chrom): chrom_int = _chromosome_as_int(chrom) if chrom_int != 6: continue if (HLA_START <= int(chunk_starts[i]) <= HLA_END) or (HLA_START <= int(chunk_ends[i]) <= HLA_END): keep_mask[i] = False removed_hla_windows_total += int((~keep_mask).sum()) if not np.any(keep_mask): continue chunk_lai = chunk_lai[keep_mask] chunk_chrom = chunk_chrom[keep_mask] chunk_starts = chunk_starts[keep_mask] chunk_ends = chunk_ends[keep_mask] _enforce_memory_budget(memory, rss_baseline_mb, context="chunk loading") maternal = chunk_lai[:, 0::2] paternal = chunk_lai[:, 1::2] n_windows_in_chunk = chunk_lai.shape[0] n_win_processed = n_windows_in_chunk if verbose and (chunk_index % 100 == 0): print(f" Chunk {chunk_index}: {n_windows_in_chunk} windows (total so far: {n_windows_processed_total + n_win_processed:,})", flush=True) chunk_index += 1 chunk_chrom_norm = [_normalize_chromosome(chunk_chrom[i]) for i in range(n_windows_in_chunk)] chunk_starts_int = chunk_starts.tolist() chunk_ends_int = chunk_ends.tolist() for ancestry_code, ancestry_label in ancestries: ancestry_code_int = int(ancestry_code) if trait_is_quantitative: if covariates_present: if covar_f64 is None or y_resid is None or q_fwl is None: raise ValueError("Internal error: missing covariate projection state.") dosage_batch = _compute_dosage_from_lai( maternal, paternal, ancestry_code_int, ) beta_arr, se_arr, t_arr, p_arr, errcode_arr = _fit_linear_batch_with_covariates( dosage_batch, y_resid, q_fwl, n_covar=n_covar, ) df_linear = float(obs_ct - (2 + n_covar)) else: n_batch, sy_batch, sy2_batch = _compute_linear_stats_from_lai( maternal, paternal, ancestry_code_int, y_f64, ) beta_arr, se_arr, t_arr, p_arr, errcode_arr = _fit_linear_batch( n_batch, sy_batch, sy2_batch, ) df_linear = float(obs_ct - 2) if ci is not None: ci_low_arr, ci_high_arr = _compute_linear_ci_beta( beta_arr, se_arr, ci=ci, df=df_linear, ) else: ci_low_arr = ci_high_arr = None current_index = kept_window_counter[ancestry_code_int] rows_buf: List[List[object]] = [] for i in range(n_windows_in_chunk): row_values = [ chunk_chrom_norm[i], chunk_starts_int[i], chunk_ends_int[i], f"w{current_index + i + 1}_{ancestry_label}", "N", ancestry_label, ancestry_label, ancestry_label, "LINEAR", obs_ct, beta_arr[i], se_arr[i], t_arr[i], p_arr[i], ] if ci is not None and ci_low_arr is not None and ci_high_arr is not None: row_values.extend([ci_low_arr[i], ci_high_arr[i]]) row_values.append(errcode_arr[i]) rows_buf.append(row_values) if return_results: records.append(dict(zip(columns_without_adjust, row_values))) if collected_p_values is not None: collected_p_values.append(float(p_arr[i])) writer.writerows(rows_buf) else: if covariates_present: if covar_f64 is None: raise ValueError("Internal error: missing aligned covariate matrix.") dosage_batch = _compute_dosage_from_lai( maternal, paternal, ancestry_code_int, ) beta_arr, se_arr, z_arr, p_arr, test_arr, errcode_arr = _fit_logistic_batch_with_covariates( dosage_batch, y_binary, covar_f64, ) else: n_counts_batch, c_counts_batch = _compute_group_counts_from_lai( maternal, paternal, ancestry_code_int, y_binary, ) beta_arr, se_arr, z_arr, p_arr, test_arr, errcode_arr = _fit_logistic_batch( n_counts_batch, c_counts_batch, ) or_arr = _odds_ratio_batch(beta_arr) if ci is not None: ci_low_arr, ci_high_arr = _compute_logistic_ci_or(beta_arr, se_arr, ci=ci) else: ci_low_arr = ci_high_arr = None current_index = kept_window_counter[ancestry_code_int] rows_buf = [] for i in range(n_windows_in_chunk): row_values = [ chunk_chrom_norm[i], chunk_starts_int[i], chunk_ends_int[i], f"w{current_index + i + 1}_{ancestry_label}", "N", ancestry_label, ancestry_label, ancestry_label, test_arr[i], obs_ct, beta_arr[i], or_arr[i], se_arr[i], z_arr[i], p_arr[i], ] if ci is not None and ci_low_arr is not None and ci_high_arr is not None: row_values.extend([ci_low_arr[i], ci_high_arr[i]]) row_values.append(errcode_arr[i]) rows_buf.append(row_values) if return_results: records.append(dict(zip(columns_without_adjust, row_values))) if collected_p_values is not None: collected_p_values.append(float(p_arr[i])) writer.writerows(rows_buf) kept_window_counter[ancestry_code_int] = current_index + n_windows_in_chunk _enforce_memory_budget(memory, rss_baseline_mb, context=f"ancestry {ancestry_label}") n_windows_processed_total += n_win_processed except Exception: try: output_file.unlink() except FileNotFoundError: pass raise if verbose and n_windows_processed_total > 0: print(f" Done. Processed {n_windows_processed_total:,} windows.", flush=True) if not keep_hla: log.info("Removed %s HLA windows.", removed_hla_windows_total) if adjust: if collected_p_values is None: raise ValueError("Internal error: adjusted p-values requested but collection is unavailable.") _apply_multiple_testing_adjustment(output_file, collected_p_values) if return_results and records: bonf_arr, fdr_arr = _compute_multiple_testing_adjustments( np.asarray(collected_p_values, dtype=np.float64) ) for rec, bonf_val, fdr_val in zip(records, bonf_arr.tolist(), fdr_arr.tolist()): rec["BONF"] = bonf_val rec["FDR_BH"] = fdr_val if return_results: results = pd.DataFrame.from_records(records) results = results.reindex(columns=final_columns) else: results = pd.DataFrame(columns=final_columns) log.info("Admixture mapping results written to %s", output_file) return results
def admixmap(argv: Sequence[str]): args = parse_admixmap_args(argv) return run_admixmap_command(args) def run_admixmap_command(args: argparse.Namespace) -> int: run_admixture_mapping( phe_path=args.phe_path, lai_source=args.lai_path, results_path=args.results_path, phe_id=args.phe_id, batch_size=args.batch_size, keep_hla=args.keep_hla, memory=args.memory, return_results=False, quantitative=args.quantitative, verbose=args.verbose, covar_path=args.covar_path, covar_col_nums=args.covar_col_nums, covar_variance_standardize=args.covar_variance_standardize, ci=args.ci, adjust=args.adjust, keep_path=args.keep_path, remove_path=args.remove_path, ) manhattan_plot_path = getattr(args, "manhattan_plot", None) qq_plot_path = getattr(args, "qq_plot", None) if manhattan_plot_path is not None or qq_plot_path is not None: import matplotlib matplotlib.use("Agg", force=True) actual_results = _resolve_output_path(args.results_path, args.phe_id) if manhattan_plot_path is not None: from snputils.visualization.manhattan_plot import manhattan_plot manhattan_plot(str(actual_results), save=True, output_filename=manhattan_plot_path) if qq_plot_path is not None: from snputils.visualization.qq_plot import qq_plot qq_plot(str(actual_results), save=True, output_filename=qq_plot_path) return 0