snputils.processing.mdpca

   1import pathlib
   2import os
   3import time
   4import gc
   5import logging
   6import logging.config
   7import numpy as np
   8import copy
   9from typing import Optional, Dict, List, Union
  10from sklearn.decomposition import TruncatedSVD
  11
  12from snputils.snp.genobj.snpobj import SNPObject
  13from snputils.ancestry.genobj.local import LocalAncestryObject
  14from ._utils.gen_tools import process_calldata_gt, process_labels_weights
  15from ._utils.iterative_svd import IterativeSVD
  16
  17
  18class mdPCA:
  19    """
  20    A class for performing missing data principal component analysis (mdPCA) on SNP data.
  21
  22    The mdPCA class focuses on genotype segments from the ancestry of interest when the `is_masked` flag is set to `True`. It offers 
  23    flexible processing options, allowing either separate handling of masked haplotype strands or combining (averaging) strands into a 
  24    single composite representation for each individual. Moreover, the analysis can be performed on individual-level data, group-level SNP 
  25    frequencies, or a combination of both.
  26
  27    If the `snpobj`, `laiobj`, `labels_file`, and `ancestry` parameters are all provided during instantiation, the `fit_transform` method 
  28    will be automatically called, applying the specified mdPCA method to transform the data upon instantiation.
  29    """
  30    def __init__(
  31        self,
  32        method: str = 'weighted_cov_pca',
  33        snpobj: Optional['SNPObject'] = None,
  34        laiobj: Optional['LocalAncestryObject'] = None,
  35        labels_file: Optional[str] = None,
  36        ancestry: Optional[Union[int, str]] = None,
  37        is_masked: bool = True,
  38        average_strands: bool = False,
  39        force_nan_incomplete_strands: bool = False,
  40        is_weighted: bool = False,
  41        groups_to_remove: List[str] = None,
  42        min_percent_snps: float = 4,
  43        group_snp_frequencies_only: bool = True,
  44        save_masks: bool = False,
  45        load_masks: bool = False,
  46        masks_file: Union[str, pathlib.Path] = 'masks.npz',
  47        output_file: Union[str, pathlib.Path] = 'output.tsv',
  48        covariance_matrix_file: Optional[str] = None,
  49        n_components: int = 2,
  50        rsid_or_chrompos: int = 2,
  51        percent_vals_masked: float = 0
  52    ):
  53        """
  54        Args:
  55            method (str, default='weighted_cov_pca'): 
  56                The PCA method to use for dimensionality reduction. Options include:
  57                - `'weighted_cov_pca'`: 
  58                    Simple covariance-based PCA, weighted by sample strengths.
  59                - `'regularized_optimization_ils'`: 
  60                    Regularized optimization followed by iterative, weighted (via the strengths) least squares projection of 
  61                    missing samples using the original covariance matrix (considering only relevant elements not missing in 
  62                    the original covariance matrix for those samples).
  63                - `'cov_matrix_imputation'`: 
  64                    Eigen-decomposition of the covariance matrix after first imputing the covariance matrix missing values 
  65                    using the Iterative SVD imputation method.
  66                - `'cov_matrix_imputation_ils'`: 
  67                    The method of 'cov_matrix_imputation', but where afterwards missing samples are re-projected onto the space 
  68                    given by 'cov_matrix_imputation' using the same iterative method on the original covariance matrix just 
  69                    as done in 'regularized_optimization_ils'.
  70                - `'nonmissing_pca_ils'`: 
  71                    The method of 'weighted_cov_pca' on the non-missing samples, followed by the projection of missing samples onto 
  72                    the space given by 'weighted_cov_pca' using the same iterative method on the original covariance matrix just as 
  73                    done in 'regularized_optimization_ils'.
  74            snpobj (SNPObject, optional): 
  75                A SNPObject instance.
  76            laiobj (LAIObject, optional): 
  77                A LAIObject instance.
  78            labels_file (str, optional): 
  79                Path to a `.tsv` file with metadata on individuals, including population labels, optional weights, and groupings. 
  80                The `indID` column must contain unique individual identifiers matching those in `laiobj` and `snpobj` for proper alignment. 
  81                The `label` column assigns population groups. If `is_weighted=True`, a `weight` column must be provided, assigning a weight to 
  82                each individual, where those with a weight of zero are removed. Optional columns include `combination` and `combination_weight` 
  83                to aggregate individuals into combined groups, where SNP frequencies represent their sequences. The `combination` column assigns 
  84                each individual to a specific group (0 for no combination, 1 for the first group, 2 for the second, etc.). All members of a group 
  85                must share the same `label` and `combination_weight`. If `combination_weight` column is not provided, the combinations are 
  86                assigned a default weight of `1`. Individuals excluded via `groups_to_remove` or those falling below `min_percent_snps` are removed 
  87                from the analysis.
  88            ancestry (int or str, optional): 
  89                Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at `0`. The ancestry input can be:
  90                - An integer (e.g., 0, 1, 2).
  91                - A string representation of an integer (e.g., '0', '1').
  92                - A string matching one of the ancestry map values (e.g., 'Africa').
  93            is_masked (bool, default=True): 
  94                If `True`, applies ancestry-specific masking to the genotype matrix, retaining only genotype data 
  95                corresponding to the specified `ancestry`. If `False`, uses the full, unmasked genotype matrix.
  96            average_strands (bool, default=False): 
  97                True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
  98            force_nan_incomplete_strands (bool): 
  99                If `True`, sets the result to NaN if either haplotype in a pair is NaN. 
 100                Otherwise, computes the mean while ignoring NaNs (e.g., 0|NaN -> 0, 1|NaN -> 1).
 101            is_weighted (bool, default=False): 
 102                If `True`, assigns individual weights from the `weight` column in `labels_file`. Otherwise, all individuals have equal weight of `1`.
 103            groups_to_remove (list of str, optional): 
 104                List with groups to exclude from analysis. Example: ['group1', 'group2'].
 105            min_percent_snps (float, default=4): 
 106                Minimum percentage of SNPs that must be known for an individual and of the ancestry of interest to be included in the analysis.
 107                All individuals with fewer percent of unmasked SNPs than this threshold will be excluded.
 108            group_snp_frequencies_only (bool, default=True):
 109                If True, mdPCA is performed exclusively on group-level SNP frequencies, ignoring individual-level data. This applies when `is_weighted` is 
 110                set to True and a `combination` column is provided in the `labels_file`,  meaning individuals are aggregated into groups based on their assigned 
 111                labels. If False, mdPCA is performed on individual-level SNP data alone or on both individual-level and group-level SNP frequencies when 
 112                `is_weighted` is True and a `combination` column is provided.
 113            save_masks (bool, default=False): 
 114                True if the masked matrices are to be saved in a `.npz` file, or False otherwise.
 115            load_masks (bool, default=False): 
 116                True if the masked matrices are to be loaded from a pre-existing `.npz` file specified by `masks_file`, or False otherwise.
 117            masks_file (str or pathlib.Path, default='masks.npz'): 
 118                Path to the `.npz` file used for saving/loading masked matrices.
 119            output_file (str or pathlib.Path, default='output.tsv'): 
 120                Path to the output `.tsv` file where mdPCA results are saved.
 121            covariance_matrix_file (str, optional): 
 122                Path to save the covariance matrix file in `.npy` format. If None, the covariance matrix is not saved. Default is None.
 123            n_components (int, default=2): 
 124                The number of principal components.
 125            rsid_or_chrompos (int, default=2): 
 126                Format indicator for SNP IDs in `self.__X_new_`. Use 1 for `rsID` format or 2 for `chromosome_position`.
 127            percent_vals_masked (float, default=0): 
 128                Percentage of values in the covariance matrix to be masked and then imputed. Only applicable if `method` is 
 129                `'cov_matrix_imputation'` or `'cov_matrix_imputation_ils'`.
 130        """
 131        self.__snpobj = snpobj
 132        self.__laiobj = laiobj
 133        self.__labels_file = labels_file
 134        self.__ancestry = self._define_ancestry(ancestry, laiobj.ancestry_map)
 135        self.__method = method
 136        self.__is_masked = is_masked
 137        self.__average_strands = average_strands
 138        self.__force_nan_incomplete_strands = force_nan_incomplete_strands
 139        self.__is_weighted = is_weighted
 140        self.__groups_to_remove = groups_to_remove
 141        self.__min_percent_snps = min_percent_snps
 142        self.__group_snp_frequencies_only = group_snp_frequencies_only
 143        self.__save_masks = save_masks
 144        self.__load_masks = load_masks
 145        self.__masks_file = masks_file
 146        self.__output_file = output_file
 147        self.__covariance_matrix_file = covariance_matrix_file
 148        self.__n_components = n_components
 149        self.__rsid_or_chrompos = rsid_or_chrompos
 150        self.__percent_vals_masked = percent_vals_masked
 151        self.__X_new_ = None  # Store transformed SNP data
 152        self.__haplotypes_ = None  # Store haplotypes of X_new_ (after filtering if min_percent_snps > 0)
 153        self.__samples_ = None  # Store samples of X_new_ (after filtering if min_percent_snps > 0)
 154        self.__variants_id_ = None  # Store variants ID (after filtering SNPs not in laiobj)
 155
 156        # Fit and transform if a `snpobj`, `laiobj`, `labels_file`, and `ancestry` are provided
 157        if self.snpobj is not None and self.laiobj is not None and self.labels_file is not None and self.ancestry is not None:
 158            self.fit_transform(snpobj, laiobj, labels_file, ancestry)
 159
 160    def __getitem__(self, key):
 161        """
 162        To access an attribute of the class using the square bracket notation,
 163        similar to a dictionary.
 164        """
 165        try:
 166            return getattr(self, key)
 167        except AttributeError:
 168            raise KeyError(f'Invalid key: {key}')
 169
 170    def __setitem__(self, key, value):
 171        """
 172        To set an attribute of the class using the square bracket notation,
 173        similar to a dictionary.
 174        """
 175        try:
 176            setattr(self, key, value)
 177        except AttributeError:
 178            raise KeyError(f'Invalid key: {key}')
 179
 180    @property
 181    def method(self) -> str:
 182        """
 183        Retrieve `method`.
 184        
 185        Returns:
 186            **str:** The PCA method to use for dimensionality reduction.
 187        """
 188        return self.__method
 189
 190    @method.setter
 191    def method(self, x: str) -> None:
 192        """
 193        Update `method`.
 194        """
 195        self.__method = x
 196
 197    @property
 198    def snpobj(self) -> Optional['SNPObject']:
 199        """
 200        Retrieve `snpobj`.
 201        
 202        Returns:
 203            **SNPObject:** A SNPObject instance.
 204        """
 205        return self.__snpobj
 206
 207    @snpobj.setter
 208    def snpobj(self, x: 'SNPObject') -> None:
 209        """
 210        Update `snpobj`.
 211        """
 212        self.__snpobj = x
 213
 214    @property
 215    def laiobj(self) -> Optional['LocalAncestryObject']:
 216        """
 217        Retrieve `laiobj`.
 218        
 219        Returns:
 220            **LocalAncestryObject:** A LocalAncestryObject instance.
 221        """
 222        return self.__laiobj
 223
 224    @laiobj.setter
 225    def laiobj(self, x: 'LocalAncestryObject') -> None:
 226        """
 227        Update `laiobj`.
 228        """
 229        self.__laiobj = x
 230
 231    @property
 232    def labels_file(self) -> Optional[str]:
 233        """
 234        Retrieve `labels_file`.
 235        
 236        Returns:
 237            **str:** Path to the labels file in `.tsv` format.
 238        """
 239        return self.__labels_file
 240
 241    @labels_file.setter
 242    def labels_file(self, x: str) -> None:
 243        """
 244        Update `labels_file`.
 245        """
 246        self.__labels_file = x
 247
 248    @property
 249    def ancestry(self) -> Optional[str]:
 250        """
 251        Retrieve `ancestry`.
 252        
 253        Returns:
 254            **str:** Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0.
 255        """
 256        return self.__ancestry
 257
 258    @ancestry.setter
 259    def ancestry(self, x: str) -> None:
 260        """
 261        Update `ancestry`.
 262        """
 263        self.__ancestry = x
 264
 265    @property
 266    def is_masked(self) -> bool:
 267        """
 268        Retrieve `is_masked`.
 269        
 270        Returns:
 271            **bool:** True if an ancestry file is passed for ancestry-specific masking, or False otherwise.
 272        """
 273        return self.__is_masked
 274
 275    @is_masked.setter
 276    def is_masked(self, x: bool) -> None:
 277        """
 278        Update `is_masked`.
 279        """
 280        self.__is_masked = x
 281
 282    @property
 283    def average_strands(self) -> bool:
 284        """
 285        Retrieve `average_strands`.
 286        
 287        Returns:
 288            **bool:** True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
 289        """
 290        return self.__average_strands
 291
 292    @average_strands.setter
 293    def average_strands(self, x: bool) -> None:
 294        """
 295        Update `average_strands`.
 296        """
 297        self.__average_strands = x
 298
 299    @property
 300    def force_nan_incomplete_strands(self) -> bool:
 301        """
 302        Retrieve `force_nan_incomplete_strands`.
 303        
 304        Returns:
 305            **bool**: If `True`, sets the result to NaN if either haplotype in a pair is NaN.
 306                      Otherwise, computes the mean while ignoring NaNs (e.g., 0|NaN -> 0, 1|NaN -> 1).
 307        """
 308        return self.__force_nan_incomplete_strands
 309
 310    @force_nan_incomplete_strands.setter
 311    def force_nan_incomplete_strands(self, x: bool) -> None:
 312        """
 313        Update `force_nan_incomplete_strands`.
 314        """
 315        self.__force_nan_incomplete_strands = x
 316
 317    @property
 318    def is_weighted(self) -> bool:
 319        """
 320        Retrieve `is_weighted`.
 321        
 322        Returns:
 323            **bool:** True if weights are provided in the labels file, or False otherwise.
 324        """
 325        return self.__is_weighted
 326
 327    @is_weighted.setter
 328    def is_weighted(self, x: bool) -> None:
 329        """
 330        Update `is_weighted`.
 331        """
 332        self.__is_weighted = x
 333
 334    @property
 335    def groups_to_remove(self) -> Dict[int, List[str]]:
 336        """
 337        Retrieve `groups_to_remove`.
 338        
 339        Returns:
 340            **dict of int to list of str:** Dictionary specifying groups to exclude from analysis. Keys are array numbers, and values are 
 341                lists of groups to remove for each array. Example: `{1: ['group1', 'group2'], 2: [], 3: ['group3']}`.
 342        """
 343        return self.__groups_to_remove
 344
 345    @groups_to_remove.setter
 346    def groups_to_remove(self, x: Dict[int, List[str]]) -> None:
 347        """
 348        Update `groups_to_remove`.
 349        """
 350        self.__groups_to_remove = x
 351
 352    @property
 353    def min_percent_snps(self) -> float:
 354        """
 355        Retrieve `min_percent_snps`.
 356        
 357        Returns:
 358            **float:** 
 359                Minimum percentage of SNPs that must be known for an individual to be included in the analysis.
 360                All individuals with fewer percent of unmasked SNPs than this threshold will be excluded.
 361        """
 362        return self.__min_percent_snps
 363
 364    @min_percent_snps.setter
 365    def min_percent_snps(self, x: float) -> None:
 366        """
 367        Update `min_percent_snps`.
 368        """
 369        self.__min_percent_snps = x
 370
 371    @property
 372    def group_snp_frequencies_only(self) -> bool:
 373        """
 374        Retrieve `group_snp_frequencies_only`.
 375        
 376        Returns:
 377            **bool:** 
 378                If True, mdPCA is performed exclusively on group-level SNP frequencies, ignoring individual-level data. This applies 
 379                when `is_weighted` is set to True and a `combination` column is provided in the `labels_file`,  meaning individuals are 
 380                aggregated into groups based on their assigned labels. If False, mdPCA is performed on individual-level SNP data alone 
 381                or on both individual-level and group-level SNP frequencies when `is_weighted` is True and a `combination` column is provided.
 382        """
 383        return self.__group_snp_frequencies_only
 384
 385    @group_snp_frequencies_only.setter
 386    def group_snp_frequencies_only(self, x: bool) -> None:
 387        """
 388        Update `group_snp_frequencies_only`.
 389        """
 390        self.__group_snp_frequencies_only = x
 391
 392    @property
 393    def save_masks(self) -> bool:
 394        """
 395        Retrieve `save_masks`.
 396        
 397        Returns:
 398            **bool:** True if the masked matrices are to be saved in a `.npz` file, or False otherwise.
 399        """
 400        return self.__save_masks
 401
 402    @save_masks.setter
 403    def save_masks(self, x: bool) -> None:
 404        """
 405        Update `save_masks`.
 406        """
 407        self.__save_masks = x
 408
 409    @property
 410    def load_masks(self) -> bool:
 411        """
 412        Retrieve `load_masks`.
 413        
 414        Returns:
 415            **bool:** 
 416                True if the masked matrices are to be loaded from a pre-existing `.npz` file specified 
 417                by `masks_file`, or False otherwise.
 418        """
 419        return self.__load_masks
 420
 421    @load_masks.setter
 422    def load_masks(self, x: bool) -> None:
 423        """
 424        Update `load_masks`.
 425        """
 426        self.__load_masks = x
 427
 428    @property
 429    def masks_file(self) -> Union[str, pathlib.Path]:
 430        """
 431        Retrieve `masks_file`.
 432        
 433        Returns:
 434            **str or pathlib.Path:** Path to the `.npz` file used for saving/loading masked matrices.
 435        """
 436        return self.__masks_file
 437
 438    @masks_file.setter
 439    def masks_file(self, x: Union[str, pathlib.Path]) -> None:
 440        """
 441        Update `masks_file`.
 442        """
 443        self.__masks_file = x
 444
 445    @property
 446    def output_file(self) -> Union[str, pathlib.Path]:
 447        """
 448        Retrieve `output_file`.
 449        
 450        Returns:
 451            **str or pathlib.Path:** Path to the output `.tsv` file where mdPCA results are saved.
 452        """
 453        return self.__output_file
 454
 455    @output_file.setter
 456    def output_file(self, x: Union[str, pathlib.Path]) -> None:
 457        """
 458        Update `output_file`.
 459        """
 460        self.__output_file = x
 461
 462    @property
 463    def covariance_matrix_file(self) -> Optional[str]:
 464        """
 465        Retrieve `covariance_matrix_file`.
 466        
 467        Returns:
 468            **str:** Path to save the covariance matrix file in `.npy` format.
 469        """
 470        return self.__covariance_matrix_file
 471    
 472    @covariance_matrix_file.setter
 473    def covariance_matrix_file(self, x: Optional[str]) -> None:
 474        """
 475        Update `covariance_matrix_file`.
 476        """
 477        self.__covariance_matrix_file = x
 478
 479    @property
 480    def n_components(self) -> int:
 481        """
 482        Retrieve `n_components`.
 483        
 484        Returns:
 485            **int:** The number of principal components.
 486        """
 487        return self.__n_components
 488
 489    @n_components.setter
 490    def n_components(self, x: int) -> None:
 491        """
 492        Update `n_components`.
 493        """
 494        self.__n_components = x
 495
 496    @property
 497    def rsid_or_chrompos(self) -> int:
 498        """
 499        Retrieve `rsid_or_chrompos`.
 500        
 501        Returns:
 502            **int:** Format indicator for SNP IDs in the SNP data. Use 1 for `rsID` format or 2 for `chromosome_position`.
 503        """
 504        return self.__rsid_or_chrompos
 505
 506    @rsid_or_chrompos.setter
 507    def rsid_or_chrompos(self, x: int) -> None:
 508        """
 509        Update `rsid_or_chrompos`.
 510        """
 511        self.__rsid_or_chrompos = x
 512
 513    @property
 514    def percent_vals_masked(self) -> float:
 515        """
 516        Retrieve `percent_vals_masked`.
 517        
 518        Returns:
 519            **float:** 
 520                Percentage of values in the covariance matrix to be masked and then imputed. Only applicable if `method` is 
 521                `'cov_matrix_imputation'` or `'cov_matrix_imputation_ils'`.
 522        """
 523        return self.__percent_vals_masked
 524
 525    @percent_vals_masked.setter
 526    def percent_vals_masked(self, x: float) -> None:
 527        """
 528        Update `percent_vals_masked`.
 529        """
 530        self.__percent_vals_masked = x
 531
 532    @property
 533    def X_new_(self) -> Optional[np.ndarray]:
 534        """
 535        Retrieve `X_new_`.
 536
 537        Returns:
 538            **array of shape (n_samples, n_components):** 
 539                The transformed SNP data projected onto the `n_components` principal components.
 540                n_haplotypes_ is the number of haplotypes, potentially reduced if filtering is applied 
 541                (`min_percent_snps > 0`). For diploid individuals without filtering, the shape is 
 542                `(n_samples * 2, n_components)`.
 543        """
 544        return self.__X_new_
 545
 546    @X_new_.setter
 547    def X_new_(self, x: np.ndarray) -> None:
 548        """
 549        Update `X_new_`.
 550        """
 551        self.__X_new_ = x
 552
 553    @property
 554    def haplotypes_(self) -> Optional[List[str]]:
 555        """
 556        Retrieve `haplotypes_`.
 557
 558        Returns:
 559            list of str:
 560                A list of unique haplotype identifiers.
 561        """
 562        if isinstance(self.__haplotypes_, np.ndarray):
 563            return self.__haplotypes_.ravel().tolist()  # Flatten and convert NumPy array to a list
 564        elif isinstance(self.__haplotypes_, list):
 565            if len(self.__haplotypes_) == 1 and isinstance(self.__haplotypes_[0], np.ndarray):
 566                return self.__haplotypes_[0].ravel().tolist()  # Handle list containing a single array
 567            return self.__haplotypes_  # Already a flat list
 568        elif self.__haplotypes_ is None:
 569            return None  # If no haplotypes are set
 570        else:
 571            raise TypeError("`haplotypes_` must be a list or a NumPy array.")
 572
 573    @haplotypes_.setter
 574    def haplotypes_(self, x: Union[np.ndarray, List[str]]) -> None:
 575        """
 576        Update `haplotypes_`.
 577        """
 578        if isinstance(x, np.ndarray):
 579            self.__haplotypes_ = x.ravel().tolist()  # Flatten and convert to a list
 580        elif isinstance(x, list):
 581            if len(x) == 1 and isinstance(x[0], np.ndarray):  # Handle list containing a single array
 582                self.__haplotypes_ = x[0].ravel().tolist()
 583            else:
 584                self.__haplotypes_ = x  # Use directly if already a list
 585        else:
 586            raise TypeError("`x` must be a list or a NumPy array.")
 587
 588    @property
 589    def samples_(self) -> Optional[List[str]]:
 590        """
 591        Retrieve `samples_`.
 592
 593        Returns:
 594            list of str:
 595                A list of sample identifiers based on `haplotypes_` and `average_strands`.
 596        """
 597        haplotypes = self.haplotypes_
 598        if haplotypes is None:
 599            return None
 600        if self.__average_strands:
 601            return haplotypes
 602        else:
 603            return [x[:-2] for x in haplotypes]
 604
 605    @property
 606    def variants_id_(self) -> Optional[np.ndarray]:
 607        """
 608        Retrieve `variants_id_`.
 609
 610        Returns:
 611            **array of shape (n_snp,):** 
 612                An array containing unique identifiers (IDs) for each SNP,
 613                potentially reduced if there are SNPs not present in the `laiobj`.
 614                The format will depend on `rsid_or_chrompos`.
 615        """
 616        return self.__variants_id_
 617
 618    @variants_id_.setter
 619    def variants_id_(self, x: np.ndarray) -> None:
 620        """
 621        Update `variants_id_`.
 622        """
 623        self.__variants_id_ = x
 624
 625    @property
 626    def n_haplotypes(self) -> Optional[int]:
 627        """
 628        Retrieve `n_haplotypes`.
 629
 630        Returns:
 631            **int:**
 632                The total number of haplotypes, potentially reduced if filtering is applied 
 633                (`min_percent_snps > 0`).
 634        """
 635        return len(self.haplotypes_)
 636
 637    @property
 638    def n_samples(self) -> Optional[int]:
 639        """
 640        Retrieve `n_samples`.
 641
 642        Returns:
 643            **int:**
 644                The total number of samples, potentially reduced if filtering is applied 
 645                (`min_percent_snps > 0`).
 646        """
 647        return len(np.unique(self.samples_))
 648
 649    def copy(self) -> 'mdPCA':
 650        """
 651        Create and return a copy of `self`.
 652
 653        Returns:
 654            **mdPCA:** 
 655                A new instance of the current object.
 656        """
 657        return copy.copy(self)
 658
 659    @staticmethod
 660    def _define_ancestry(ancestry, ancestry_map):
 661        """
 662        Determine the ancestry index based on different input types.
 663
 664        Args:
 665            ancestry (int or str): The ancestry input, which can be:
 666                - An integer (e.g., 0, 1, 2).
 667                - A string representation of an integer (e.g., '0', '1').
 668                - A string matching one of the ancestry map values (e.g., 'Africa').
 669            ancestry_map (dict): A dictionary mapping ancestry indices (as strings) to ancestry names.
 670
 671        Returns:
 672            int: The corresponding ancestry index.
 673        """
 674        if isinstance(ancestry, int):  
 675            return ancestry  
 676        elif isinstance(ancestry, str) and ancestry.isdigit():  
 677            return int(ancestry)  
 678        elif ancestry in ancestry_map.values():  
 679            return int(next(key for key, value in ancestry_map.items() if value == ancestry))  
 680        else:  
 681            raise ValueError(f"Invalid ancestry input: {ancestry}")
 682
 683    def _load_mask_file(self):
 684        """
 685        Load previously saved masked genotype data from an `.npz` file.
 686        """
 687        mask_files = np.load(self.masks_file, allow_pickle=True)
 688        mask = mask_files['mask']
 689        rs_ID_list = mask_files['rs_ID_list']
 690        ind_ID_list = mask_files['ind_ID_list']
 691        labels = mask_files['labels']
 692        weights = mask_files['weights']
 693        return mask, rs_ID_list, ind_ID_list, labels, weights
 694
 695    @staticmethod
 696    def _compute_strength_vector(X):
 697        strength_vector = np.sum(~np.isnan(X), axis=1) / X.shape[1]
 698        return strength_vector
 699
 700    @staticmethod
 701    def _compute_strength_matrix(X):
 702        notmask = (~np.isnan(X)).astype(np.float32)
 703        strength_matrix = np.dot(notmask, notmask.T)
 704        strength_matrix /= X.shape[1]
 705        return strength_matrix
 706
 707    def _cov(self, x):
 708        ddof = 1
 709        x = np.ma.array(x, ndmin=2, copy=True, dtype=np.float32)
 710        xmask = np.ma.getmaskarray(x)
 711        xnotmask = np.logical_not(xmask).astype(np.float32)
 712        fact = np.dot(xnotmask, xnotmask.T) * 1. - ddof
 713        del xnotmask
 714        gc.collect()
 715        result = (np.ma.dot(x, x.T, strict=False) / fact).squeeze()
 716        x = x.data
 717        strength_vec = self._compute_strength_vector(x)
 718        strength_mat = self._compute_strength_matrix(x)
 719        return result.data, strength_vec, strength_mat
 720
 721    @staticmethod
 722    def _demean(S, w):
 723        w_sum = np.matmul(w, S)
 724        w_rowsum = w_sum.reshape((1, S.shape[0]))
 725        w_colsum = w_sum.reshape((S.shape[0], 1))
 726        w_totalsum = np.dot(w_sum, w)
 727        S -= (w_rowsum + w_colsum) - w_totalsum
 728        return S
 729
 730    @staticmethod
 731    def _iterative_svd_matrix_completion(cov):
 732        num_masked = np.isnan(cov).sum()
 733        if num_masked > 0:
 734            start_time = time.time()
 735            start_rank = 1
 736            end_rank = 10
 737            rank = 5
 738            choose_best = True
 739            num_cores = 5
 740            cov_complete = IterativeSVD(start_rank=start_rank, end_rank=end_rank, rank=rank, choose_best=choose_best, num_cores=num_cores).fit_transform(cov)
 741            logging.info("Iterative SVD --- %s seconds ---" % (time.time() - start_time))
 742        else:
 743            cov_complete = cov
 744        return cov_complete
 745
 746    def _weighted_cov_pca(self, X_incomplete, weights, covariance_matrix_file, n_components):
 747        start_time = time.time()
 748        X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete))
 749        S, _, _ = self._cov(X_incomplete)
 750        logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time))
 751        weights_normalized = weights / weights.sum()
 752        S = self._demean(S, weights_normalized)
 753        W = np.diag(weights)
 754        WSW = np.matmul(np.matmul(np.sqrt(W), S), np.sqrt(W))
 755        if covariance_matrix_file:
 756            np.save(covariance_matrix_file, S.data)
 757        svd = TruncatedSVD(n_components, algorithm="arpack")
 758        svd.fit(WSW)
 759        U = np.matmul(svd.components_, np.linalg.inv(np.sqrt(W))).T
 760        D = np.diag(np.sqrt(svd.singular_values_))
 761        T = np.matmul(U, D)
 762        total_var = np.trace(S)
 763        for i in range(n_components):
 764            pc_percentvar = 100 * svd.singular_values_[i] / total_var
 765            logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar)
 766        return T
 767
 768    @staticmethod
 769    def _create_validation_mask(X_incomplete, percent_inds):
 770        np.random.seed(1)
 771        masked_rows = np.isnan(X_incomplete).any(axis=1)
 772        masked_inds = np.flatnonzero(masked_rows)
 773        X_masked = X_incomplete[masked_rows] 
 774        percent_masked = 100 * np.isnan(X_masked).sum() / (X_masked.shape[0] * X_masked.shape[1])
 775        unmasked_rows = ~masked_rows
 776        X_unmasked = X_incomplete[unmasked_rows]
 777        masked_rows = np.random.choice(range(X_unmasked.shape[0]), size=int(X_unmasked.shape[0] * percent_inds / 100), replace=False)
 778        X_masked_rows = X_unmasked[masked_rows,:]
 779        mask = np.zeros(X_masked_rows.shape[0] * X_masked_rows.shape[1], dtype=np.int8)
 780        mask[:int(X_masked_rows.shape[0] * X_masked_rows.shape[1] * percent_masked / 100)] = 1
 781        np.random.shuffle(mask)
 782        mask = mask.astype(bool)
 783        mask = mask.reshape(X_masked_rows.shape)
 784        X_masked_rows[mask] = np.nan
 785        X_unmasked[masked_rows] = X_masked_rows
 786        X_incomplete[unmasked_rows] = X_unmasked
 787        masked_rows_new = np.isnan(X_incomplete).any(axis=1)
 788        masked_inds_new = np.flatnonzero(masked_rows_new)
 789        masked_inds_val = sorted(list(set(masked_inds_new) - set(masked_inds)))
 790        return X_incomplete, masked_inds_val
 791
 792    @staticmethod    
 793    def _create_cov_mask(cov, strength_mat, percent_vals_masked):
 794        cov_flattened = cov.reshape(-1).copy()
 795        num_vals_masked = int(percent_vals_masked * len(cov_flattened) / 100)
 796        pos_masked = strength_mat.reshape(-1).argsort()[:num_vals_masked]
 797        cov_flattened[pos_masked] = np.nan
 798        cov_masked = cov_flattened.reshape(cov.shape)
 799        return cov_masked
 800
 801    def _regularized_optimization_ils(self, X_incomplete, covariance_matrix_file, n_components):
 802        def run_cov_matrix_regularized_optimization_ils(X_incomplete, covariance_matrix_file):
 803            start_time = time.time()
 804            X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete))
 805            has_missing = np.isnan(X_incomplete.data).sum() > 0
 806            S, _, strength_mat = self._cov(X_incomplete)
 807            logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time))
 808            S = self._demean(S, np.ones(S.shape[0]) / S.shape[0])
 809            if has_missing: 
 810                logging.info("Starting matrix completion. This will take a few minutes...")
 811                start_time = time.time()
 812                percent_inds_val = 20 # Percent of unmasked individuals to be masked for cross-validation 
 813                X_incomplete, masked_inds_val = self._create_validation_mask(X_incomplete.data, percent_inds_val) # masked_inds_val is the list of indices of the individuals masked for validation
 814                X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete))
 815                S_prime, _, w_mat_prime = self._cov(X_incomplete)
 816                del X_incomplete
 817                gc.collect()
 818                S_prime = self._demean(S_prime, np.ones(S_prime.shape[0]) / S_prime.shape[0])
 819                S_robust, lam = self.matrix_completion(S, strength_mat, S_prime, w_mat_prime, lams=None, method="NN", 
 820                                                cv_inds=masked_inds_val)
 821                logging.info(f"Covariance Matrix --- %{time.time() - start_time:.2}s seconds ---")
 822            else:
 823                S_robust = S.copy()
 824            if covariance_matrix_file:
 825                np.save(covariance_matrix_file, S.data)
 826                if has_missing:
 827                    base, ext = os.path.splitext(covariance_matrix_file)
 828                    np.save(f"{base}_completed_{lam}{ext}", S_robust.data)
 829            return S, S_robust
 830
 831        def _compute_projection_regularized_optimization_ils(X_incomplete, S, S_robust, n_components):
 832            _, _, strength_mat = self._cov(np.ma.array(X_incomplete, mask=np.isnan(X_incomplete)))
 833            nonmissing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) == 0)
 834            missing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) > 0)
 835            missing_sorted = sorted(missing, key=lambda x: (~np.isnan(X_incomplete[x])).sum(), reverse=True)
 836            svd = TruncatedSVD(n_components, algorithm="arpack")
 837            svd.fit(S_robust)
 838            U = svd.components_.T
 839            U_nonmissing = U[nonmissing]
 840            D_nonmissing = np.diag(np.sqrt(svd.singular_values_))
 841            T = np.zeros((X_incomplete.shape[0], n_components))
 842            T_filled = np.matmul(U_nonmissing, D_nonmissing)
 843            i_filled = nonmissing
 844            T[nonmissing] = T_filled
 845            for i in missing_sorted:
 846                S_i = S[i][i_filled]
 847                W_i = np.diag(strength_mat[i][i_filled])
 848                A = np.matmul(W_i, T_filled)
 849                b = np.matmul(W_i, S_i)
 850                t = np.linalg.lstsq(A, b, rcond=None)[0]
 851                T[i] = t
 852                i_filled = np.append(i_filled, i)
 853                T_filled = np.append(T_filled, [t], axis=0)
 854            total_var = np.trace(S)
 855            for i in range(n_components):
 856                pc_percentvar = 100 * svd.singular_values_[i] / total_var
 857                logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar)
 858            return T
 859        
 860        S, S_robust = run_cov_matrix_regularized_optimization_ils(X_incomplete, covariance_matrix_file)
 861        T = _compute_projection_regularized_optimization_ils(X_incomplete, S, S_robust, n_components)
 862        return T
 863
 864    def _cov_matrix_imputation(self, X_incomplete, percent_vals_masked, covariance_matrix_file, n_components):
 865        start_time = time.time()
 866        X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete))
 867        S, strength_vec, strength_mat = self._cov(X_incomplete)
 868        logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time))
 869        S_masked = self._create_cov_mask(S, strength_mat, percent_vals_masked)
 870        S = self._iterative_svd_matrix_completion(S_masked)
 871        weights = np.ones(len(strength_vec))
 872        weights_normalized = weights / weights.sum()
 873        S = self._demean(S, weights_normalized)
 874        W = np.diag(weights)
 875        WSW = np.matmul(np.matmul(np.sqrt(W), S), np.sqrt(W))
 876        if covariance_matrix_file:
 877            np.save(covariance_matrix_file, S.data)
 878        svd = TruncatedSVD(n_components, algorithm="arpack")
 879        svd.fit(WSW)
 880        U = np.matmul(svd.components_, np.linalg.inv(np.sqrt(W))).T
 881        D = np.diag(np.sqrt(svd.singular_values_))
 882        T = np.matmul(U, D)
 883        total_var = np.trace(S)
 884        for i in range(n_components):
 885            pc_percentvar = 100 * svd.singular_values_[i] / total_var
 886            logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar)
 887        return T
 888
 889    def _cov_matrix_imputation_ils(self, X_incomplete, percent_vals_masked, covariance_matrix_file, n_components):
 890        
 891        def run_cov_matrix_cov_matrix_imputation_ils(X_incomplete, covariance_matrix_file):
 892            start_time = time.time()
 893            X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete))
 894            S, strength_vec, strength_mat = self._cov(X_incomplete)
 895            logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time))
 896            S_masked = self._create_cov_mask(S, strength_mat, percent_vals_masked)
 897            S = self._iterative_svd_matrix_completion(S_masked)
 898            weights = np.ones(len(strength_vec))
 899            weights_normalized = weights / weights.sum()
 900            S = self._demean(S, weights_normalized)
 901            W = np.diag(weights)
 902            if covariance_matrix_file:
 903                np.save(covariance_matrix_file, S.data)
 904            return S, W
 905
 906        def compute_projection_cov_matrix_imputation_ils(X_incomplete, S, W, n_components):
 907            S, _, strength_mat = self._cov(np.ma.array(X_incomplete, mask=np.isnan(X_incomplete)))
 908            S = self._demean(S, np.ones(S.shape[0]) / S.shape[0])
 909            nonmissing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) == 0)
 910            missing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) > 0)
 911            missing_sorted = sorted(missing, key=lambda x: (~np.isnan(X_incomplete[x])).sum(), reverse=True)
 912            svd = TruncatedSVD(n_components, algorithm="arpack")
 913            WSW = np.matmul(np.matmul(np.sqrt(W), S), np.sqrt(W))
 914            svd.fit(WSW)
 915            U = np.matmul(svd.components_, np.linalg.inv(np.sqrt(W))).T
 916            U_nonmissing = U[nonmissing]
 917            D_nonmissing = np.diag(np.sqrt(svd.singular_values_))
 918            T = np.zeros((X_incomplete.shape[0], n_components))
 919            T_filled = np.matmul(U_nonmissing, D_nonmissing)
 920            i_filled = nonmissing
 921            T[nonmissing] = T_filled
 922            for i in missing_sorted:
 923                S_i = S[i][i_filled]
 924                W_i = np.diag(strength_mat[i][i_filled])
 925                A = np.matmul(W_i, T_filled)
 926                b = np.matmul(W_i, S_i)
 927                t = np.linalg.lstsq(A, b, rcond=None)[0]
 928                T[i] = t
 929                i_filled = np.append(i_filled, i)
 930                T_filled = np.append(T_filled, [t], axis=0)
 931            total_var = np.trace(S)
 932            for i in range(n_components):
 933                pc_percentvar = 100 * svd.singular_values_[i] / total_var
 934                logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar)
 935            return T
 936        
 937        S, W = run_cov_matrix_cov_matrix_imputation_ils(X_incomplete, covariance_matrix_file)
 938        T = compute_projection_cov_matrix_imputation_ils(X_incomplete, S, W, n_components)
 939        return T
 940
 941    def _nonmissing_pca_ils(self, X_incomplete, covariance_matrix_file, n_components):
 942    
 943        def run_cov_matrix_nonmissing_pca_ils(X_incomplete, covariance_matrix_file):
 944            start_time = time.time()
 945            nonmissing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) == 0)
 946            X_incomplete_nonmissing = X_incomplete[nonmissing]
 947            X_incomplete_nonmissing = np.ma.array(X_incomplete_nonmissing, mask=np.isnan(X_incomplete_nonmissing))
 948            S_nonmissing, _, _ = self._cov(X_incomplete_nonmissing)
 949            logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time))
 950            S_nonmissing = self._demean(S_nonmissing, np.ones(S_nonmissing.shape[0]) / S_nonmissing.shape[0])    
 951            if covariance_matrix_file:
 952                np.save(covariance_matrix_file, S_nonmissing.data)
 953            return S_nonmissing
 954        
 955        def compute_projection_nonmissing_pca_ils(X_incomplete, S_nonmissing, n_components):
 956            S, _, strength_mat = self._cov(np.ma.array(X_incomplete, mask=np.isnan(X_incomplete)))
 957            S = self._demean(S, np.ones(S.shape[0]) / S.shape[0])
 958            nonmissing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) == 0)
 959            missing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) > 0)
 960            missing_sorted = sorted(missing, key=lambda x: (~np.isnan(X_incomplete[x])).sum(), reverse=True)
 961            svd = TruncatedSVD(n_components, algorithm="arpack")
 962            svd.fit(S_nonmissing)
 963            U_nonmissing = svd.components_.T
 964            D_nonmissing = np.diag(np.sqrt(svd.singular_values_))
 965            T = np.zeros((X_incomplete.shape[0], n_components))
 966            T_filled = np.matmul(U_nonmissing, D_nonmissing)
 967            i_filled = nonmissing
 968            T[nonmissing] = T_filled
 969            for i in missing_sorted:
 970                S_i = S[i][i_filled]
 971                W_i = np.diag(strength_mat[i][i_filled])
 972                A = np.matmul(W_i, T_filled)
 973                b = np.matmul(W_i, S_i)
 974                t = np.linalg.lstsq(A, b, rcond=None)[0]
 975                T[i] = t
 976                i_filled = np.append(i_filled, i)
 977                T_filled = np.append(T_filled, [t], axis=0)
 978            total_var = np.trace(S)
 979            for i in range(n_components):
 980                pc_percentvar = 100 * svd.singular_values_[i] / total_var
 981                logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar)
 982            return T
 983        
 984        S_nonmissing = run_cov_matrix_nonmissing_pca_ils(X_incomplete, covariance_matrix_file)
 985        T = compute_projection_nonmissing_pca_ils(X_incomplete, S_nonmissing, n_components)
 986        return T
 987
 988    def _run_cov_matrix(self, X_incomplete, weights):
 989        """
 990        Runs the specified PCA method on the incomplete data.
 991        """
 992        if self.method == "weighted_cov_pca":
 993            return self._weighted_cov_pca(X_incomplete, weights, self.covariance_matrix_file, self.n_components)
 994        elif self.method == "regularized_optimization_ils":
 995            return self._regularized_optimization_ils(X_incomplete, self.covariance_matrix_file, self.n_components)
 996        elif self.method == "cov_matrix_imputation":
 997            return self._cov_matrix_imputation(X_incomplete, self.percent_vals_masked, self.covariance_matrix_file, self.n_components)
 998        elif self.method == "cov_matrix_imputation_ils":
 999            return self._cov_matrix_imputation_ils(X_incomplete, self.percent_vals_masked, self.covariance_matrix_file, self.n_components)
1000        elif self.method == "nonmissing_pca_ils":
1001            return self._nonmissing_pca_ils(X_incomplete, self.covariance_matrix_file, self.n_components)
1002        else:
1003            raise ValueError(f"Unsupported method: {self.method}")
1004
1005    def fit_transform(
1006            self,
1007            snpobj: Optional['SNPObject'] = None, 
1008            laiobj: Optional['LocalAncestryObject'] = None,
1009            labels_file: Optional[str] = None,
1010            ancestry: Optional[str] = None,
1011            average_strands: Optional[bool] = None
1012        ) -> np.ndarray:
1013        """
1014        Fit the model to the SNP data stored in the provided `snpobj` and apply the dimensionality reduction on the same SNP data.
1015        
1016        This method starts by loading or updating SNP and ancestry data. Then, it manages missing values by applying 
1017        masks based on ancestry, either by loading a pre-existing mask or generating new ones. After processing these 
1018        masks to produce an incomplete SNP data matrix, it applies the chosen mdPCA method to reduce dimensionality 
1019        while handling missing data as specified.
1020
1021        Args:
1022            snpobj (SNPObject, optional): 
1023                A SNPObject instance.
1024            laiobj (LAIObject, optional): 
1025                A LAIObject instance.
1026            labels_file (str, optional): 
1027                Path to the labels file in .tsv format. The first column, `indID`, contains the individual identifiers, and the second 
1028                column, `label`, specifies the groups for all individuals. If `is_weighted=True`, a `weight` column with individual 
1029                weights is required. Optionally, `combination` and `combination_weight` columns can specify sets of individuals to be 
1030                combined into groups, with respective weights.
1031            ancestry (str, optional): 
1032                Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0.
1033            average_strands (bool, optional): 
1034                True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
1035                If None, defaults to `self.average_strands`.
1036
1037        Returns:
1038            **array of shape (n_samples, n_components):** 
1039                The transformed SNP data projected onto the `n_components` principal components, stored in `self.X_new_`.
1040        """
1041        if snpobj is None:
1042            snpobj = self.snpobj
1043        if laiobj is None:
1044            laiobj = self.laiobj
1045        if labels_file is None:
1046            labels_file = self.labels_file
1047        if ancestry is None:
1048            ancestry = self.ancestry
1049        if average_strands is None:
1050            average_strands = self.average_strands
1051        
1052        if self.load_masks:
1053            # Load precomputed ancestry-based masked genotype matrix, SNP identifiers, haplotype identifiers, and weights
1054            mask, variants_id, haplotypes, _, weights = self._load_mask_file()
1055        else:
1056            # Process genotype data with optional ancestry-based masking and return the corresponding SNP and individual identifiers
1057            mask, variants_id, haplotypes = process_calldata_gt(
1058                self.snpobj,
1059                self.laiobj,
1060                self.ancestry,
1061                self.average_strands,
1062                self.force_nan_incomplete_strands,
1063                self.is_masked,
1064                self.rsid_or_chrompos
1065            )
1066
1067            # Process individual genomic labels and weights, aligning them with a masked genotype matrix by 
1068            # filtering out low-coverage individuals, reordering data to match the matrix structure, and 
1069            # handling group-based adjustments
1070            mask, haplotypes, _, weights = process_labels_weights(
1071                self.labels_file,
1072                mask,
1073                variants_id,
1074                haplotypes,
1075                self.average_strands,
1076                self.ancestry,
1077                self.min_percent_snps,
1078                self.group_snp_frequencies_only,
1079                self.groups_to_remove,
1080                self.is_weighted,
1081                self.save_masks,
1082                self.masks_file
1083            )
1084
1085        # Call run_cov_matrix with the specified method
1086        self.X_new_ = self._run_cov_matrix(
1087            mask[self.ancestry].T,
1088            weights
1089        )
1090
1091        self.haplotypes_ = haplotypes
1092        self.variants_id_ = variants_id
1093
1094        return self.X_new_
class mdPCA:
  19class mdPCA:
  20    """
  21    A class for performing missing data principal component analysis (mdPCA) on SNP data.
  22
  23    The mdPCA class focuses on genotype segments from the ancestry of interest when the `is_masked` flag is set to `True`. It offers 
  24    flexible processing options, allowing either separate handling of masked haplotype strands or combining (averaging) strands into a 
  25    single composite representation for each individual. Moreover, the analysis can be performed on individual-level data, group-level SNP 
  26    frequencies, or a combination of both.
  27
  28    If the `snpobj`, `laiobj`, `labels_file`, and `ancestry` parameters are all provided during instantiation, the `fit_transform` method 
  29    will be automatically called, applying the specified mdPCA method to transform the data upon instantiation.
  30    """
  31    def __init__(
  32        self,
  33        method: str = 'weighted_cov_pca',
  34        snpobj: Optional['SNPObject'] = None,
  35        laiobj: Optional['LocalAncestryObject'] = None,
  36        labels_file: Optional[str] = None,
  37        ancestry: Optional[Union[int, str]] = None,
  38        is_masked: bool = True,
  39        average_strands: bool = False,
  40        force_nan_incomplete_strands: bool = False,
  41        is_weighted: bool = False,
  42        groups_to_remove: List[str] = None,
  43        min_percent_snps: float = 4,
  44        group_snp_frequencies_only: bool = True,
  45        save_masks: bool = False,
  46        load_masks: bool = False,
  47        masks_file: Union[str, pathlib.Path] = 'masks.npz',
  48        output_file: Union[str, pathlib.Path] = 'output.tsv',
  49        covariance_matrix_file: Optional[str] = None,
  50        n_components: int = 2,
  51        rsid_or_chrompos: int = 2,
  52        percent_vals_masked: float = 0
  53    ):
  54        """
  55        Args:
  56            method (str, default='weighted_cov_pca'): 
  57                The PCA method to use for dimensionality reduction. Options include:
  58                - `'weighted_cov_pca'`: 
  59                    Simple covariance-based PCA, weighted by sample strengths.
  60                - `'regularized_optimization_ils'`: 
  61                    Regularized optimization followed by iterative, weighted (via the strengths) least squares projection of 
  62                    missing samples using the original covariance matrix (considering only relevant elements not missing in 
  63                    the original covariance matrix for those samples).
  64                - `'cov_matrix_imputation'`: 
  65                    Eigen-decomposition of the covariance matrix after first imputing the covariance matrix missing values 
  66                    using the Iterative SVD imputation method.
  67                - `'cov_matrix_imputation_ils'`: 
  68                    The method of 'cov_matrix_imputation', but where afterwards missing samples are re-projected onto the space 
  69                    given by 'cov_matrix_imputation' using the same iterative method on the original covariance matrix just 
  70                    as done in 'regularized_optimization_ils'.
  71                - `'nonmissing_pca_ils'`: 
  72                    The method of 'weighted_cov_pca' on the non-missing samples, followed by the projection of missing samples onto 
  73                    the space given by 'weighted_cov_pca' using the same iterative method on the original covariance matrix just as 
  74                    done in 'regularized_optimization_ils'.
  75            snpobj (SNPObject, optional): 
  76                A SNPObject instance.
  77            laiobj (LAIObject, optional): 
  78                A LAIObject instance.
  79            labels_file (str, optional): 
  80                Path to a `.tsv` file with metadata on individuals, including population labels, optional weights, and groupings. 
  81                The `indID` column must contain unique individual identifiers matching those in `laiobj` and `snpobj` for proper alignment. 
  82                The `label` column assigns population groups. If `is_weighted=True`, a `weight` column must be provided, assigning a weight to 
  83                each individual, where those with a weight of zero are removed. Optional columns include `combination` and `combination_weight` 
  84                to aggregate individuals into combined groups, where SNP frequencies represent their sequences. The `combination` column assigns 
  85                each individual to a specific group (0 for no combination, 1 for the first group, 2 for the second, etc.). All members of a group 
  86                must share the same `label` and `combination_weight`. If `combination_weight` column is not provided, the combinations are 
  87                assigned a default weight of `1`. Individuals excluded via `groups_to_remove` or those falling below `min_percent_snps` are removed 
  88                from the analysis.
  89            ancestry (int or str, optional): 
  90                Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at `0`. The ancestry input can be:
  91                - An integer (e.g., 0, 1, 2).
  92                - A string representation of an integer (e.g., '0', '1').
  93                - A string matching one of the ancestry map values (e.g., 'Africa').
  94            is_masked (bool, default=True): 
  95                If `True`, applies ancestry-specific masking to the genotype matrix, retaining only genotype data 
  96                corresponding to the specified `ancestry`. If `False`, uses the full, unmasked genotype matrix.
  97            average_strands (bool, default=False): 
  98                True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
  99            force_nan_incomplete_strands (bool): 
 100                If `True`, sets the result to NaN if either haplotype in a pair is NaN. 
 101                Otherwise, computes the mean while ignoring NaNs (e.g., 0|NaN -> 0, 1|NaN -> 1).
 102            is_weighted (bool, default=False): 
 103                If `True`, assigns individual weights from the `weight` column in `labels_file`. Otherwise, all individuals have equal weight of `1`.
 104            groups_to_remove (list of str, optional): 
 105                List with groups to exclude from analysis. Example: ['group1', 'group2'].
 106            min_percent_snps (float, default=4): 
 107                Minimum percentage of SNPs that must be known for an individual and of the ancestry of interest to be included in the analysis.
 108                All individuals with fewer percent of unmasked SNPs than this threshold will be excluded.
 109            group_snp_frequencies_only (bool, default=True):
 110                If True, mdPCA is performed exclusively on group-level SNP frequencies, ignoring individual-level data. This applies when `is_weighted` is 
 111                set to True and a `combination` column is provided in the `labels_file`,  meaning individuals are aggregated into groups based on their assigned 
 112                labels. If False, mdPCA is performed on individual-level SNP data alone or on both individual-level and group-level SNP frequencies when 
 113                `is_weighted` is True and a `combination` column is provided.
 114            save_masks (bool, default=False): 
 115                True if the masked matrices are to be saved in a `.npz` file, or False otherwise.
 116            load_masks (bool, default=False): 
 117                True if the masked matrices are to be loaded from a pre-existing `.npz` file specified by `masks_file`, or False otherwise.
 118            masks_file (str or pathlib.Path, default='masks.npz'): 
 119                Path to the `.npz` file used for saving/loading masked matrices.
 120            output_file (str or pathlib.Path, default='output.tsv'): 
 121                Path to the output `.tsv` file where mdPCA results are saved.
 122            covariance_matrix_file (str, optional): 
 123                Path to save the covariance matrix file in `.npy` format. If None, the covariance matrix is not saved. Default is None.
 124            n_components (int, default=2): 
 125                The number of principal components.
 126            rsid_or_chrompos (int, default=2): 
 127                Format indicator for SNP IDs in `self.__X_new_`. Use 1 for `rsID` format or 2 for `chromosome_position`.
 128            percent_vals_masked (float, default=0): 
 129                Percentage of values in the covariance matrix to be masked and then imputed. Only applicable if `method` is 
 130                `'cov_matrix_imputation'` or `'cov_matrix_imputation_ils'`.
 131        """
 132        self.__snpobj = snpobj
 133        self.__laiobj = laiobj
 134        self.__labels_file = labels_file
 135        self.__ancestry = self._define_ancestry(ancestry, laiobj.ancestry_map)
 136        self.__method = method
 137        self.__is_masked = is_masked
 138        self.__average_strands = average_strands
 139        self.__force_nan_incomplete_strands = force_nan_incomplete_strands
 140        self.__is_weighted = is_weighted
 141        self.__groups_to_remove = groups_to_remove
 142        self.__min_percent_snps = min_percent_snps
 143        self.__group_snp_frequencies_only = group_snp_frequencies_only
 144        self.__save_masks = save_masks
 145        self.__load_masks = load_masks
 146        self.__masks_file = masks_file
 147        self.__output_file = output_file
 148        self.__covariance_matrix_file = covariance_matrix_file
 149        self.__n_components = n_components
 150        self.__rsid_or_chrompos = rsid_or_chrompos
 151        self.__percent_vals_masked = percent_vals_masked
 152        self.__X_new_ = None  # Store transformed SNP data
 153        self.__haplotypes_ = None  # Store haplotypes of X_new_ (after filtering if min_percent_snps > 0)
 154        self.__samples_ = None  # Store samples of X_new_ (after filtering if min_percent_snps > 0)
 155        self.__variants_id_ = None  # Store variants ID (after filtering SNPs not in laiobj)
 156
 157        # Fit and transform if a `snpobj`, `laiobj`, `labels_file`, and `ancestry` are provided
 158        if self.snpobj is not None and self.laiobj is not None and self.labels_file is not None and self.ancestry is not None:
 159            self.fit_transform(snpobj, laiobj, labels_file, ancestry)
 160
 161    def __getitem__(self, key):
 162        """
 163        To access an attribute of the class using the square bracket notation,
 164        similar to a dictionary.
 165        """
 166        try:
 167            return getattr(self, key)
 168        except AttributeError:
 169            raise KeyError(f'Invalid key: {key}')
 170
 171    def __setitem__(self, key, value):
 172        """
 173        To set an attribute of the class using the square bracket notation,
 174        similar to a dictionary.
 175        """
 176        try:
 177            setattr(self, key, value)
 178        except AttributeError:
 179            raise KeyError(f'Invalid key: {key}')
 180
 181    @property
 182    def method(self) -> str:
 183        """
 184        Retrieve `method`.
 185        
 186        Returns:
 187            **str:** The PCA method to use for dimensionality reduction.
 188        """
 189        return self.__method
 190
 191    @method.setter
 192    def method(self, x: str) -> None:
 193        """
 194        Update `method`.
 195        """
 196        self.__method = x
 197
 198    @property
 199    def snpobj(self) -> Optional['SNPObject']:
 200        """
 201        Retrieve `snpobj`.
 202        
 203        Returns:
 204            **SNPObject:** A SNPObject instance.
 205        """
 206        return self.__snpobj
 207
 208    @snpobj.setter
 209    def snpobj(self, x: 'SNPObject') -> None:
 210        """
 211        Update `snpobj`.
 212        """
 213        self.__snpobj = x
 214
 215    @property
 216    def laiobj(self) -> Optional['LocalAncestryObject']:
 217        """
 218        Retrieve `laiobj`.
 219        
 220        Returns:
 221            **LocalAncestryObject:** A LocalAncestryObject instance.
 222        """
 223        return self.__laiobj
 224
 225    @laiobj.setter
 226    def laiobj(self, x: 'LocalAncestryObject') -> None:
 227        """
 228        Update `laiobj`.
 229        """
 230        self.__laiobj = x
 231
 232    @property
 233    def labels_file(self) -> Optional[str]:
 234        """
 235        Retrieve `labels_file`.
 236        
 237        Returns:
 238            **str:** Path to the labels file in `.tsv` format.
 239        """
 240        return self.__labels_file
 241
 242    @labels_file.setter
 243    def labels_file(self, x: str) -> None:
 244        """
 245        Update `labels_file`.
 246        """
 247        self.__labels_file = x
 248
 249    @property
 250    def ancestry(self) -> Optional[str]:
 251        """
 252        Retrieve `ancestry`.
 253        
 254        Returns:
 255            **str:** Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0.
 256        """
 257        return self.__ancestry
 258
 259    @ancestry.setter
 260    def ancestry(self, x: str) -> None:
 261        """
 262        Update `ancestry`.
 263        """
 264        self.__ancestry = x
 265
 266    @property
 267    def is_masked(self) -> bool:
 268        """
 269        Retrieve `is_masked`.
 270        
 271        Returns:
 272            **bool:** True if an ancestry file is passed for ancestry-specific masking, or False otherwise.
 273        """
 274        return self.__is_masked
 275
 276    @is_masked.setter
 277    def is_masked(self, x: bool) -> None:
 278        """
 279        Update `is_masked`.
 280        """
 281        self.__is_masked = x
 282
 283    @property
 284    def average_strands(self) -> bool:
 285        """
 286        Retrieve `average_strands`.
 287        
 288        Returns:
 289            **bool:** True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
 290        """
 291        return self.__average_strands
 292
 293    @average_strands.setter
 294    def average_strands(self, x: bool) -> None:
 295        """
 296        Update `average_strands`.
 297        """
 298        self.__average_strands = x
 299
 300    @property
 301    def force_nan_incomplete_strands(self) -> bool:
 302        """
 303        Retrieve `force_nan_incomplete_strands`.
 304        
 305        Returns:
 306            **bool**: If `True`, sets the result to NaN if either haplotype in a pair is NaN.
 307                      Otherwise, computes the mean while ignoring NaNs (e.g., 0|NaN -> 0, 1|NaN -> 1).
 308        """
 309        return self.__force_nan_incomplete_strands
 310
 311    @force_nan_incomplete_strands.setter
 312    def force_nan_incomplete_strands(self, x: bool) -> None:
 313        """
 314        Update `force_nan_incomplete_strands`.
 315        """
 316        self.__force_nan_incomplete_strands = x
 317
 318    @property
 319    def is_weighted(self) -> bool:
 320        """
 321        Retrieve `is_weighted`.
 322        
 323        Returns:
 324            **bool:** True if weights are provided in the labels file, or False otherwise.
 325        """
 326        return self.__is_weighted
 327
 328    @is_weighted.setter
 329    def is_weighted(self, x: bool) -> None:
 330        """
 331        Update `is_weighted`.
 332        """
 333        self.__is_weighted = x
 334
 335    @property
 336    def groups_to_remove(self) -> Dict[int, List[str]]:
 337        """
 338        Retrieve `groups_to_remove`.
 339        
 340        Returns:
 341            **dict of int to list of str:** Dictionary specifying groups to exclude from analysis. Keys are array numbers, and values are 
 342                lists of groups to remove for each array. Example: `{1: ['group1', 'group2'], 2: [], 3: ['group3']}`.
 343        """
 344        return self.__groups_to_remove
 345
 346    @groups_to_remove.setter
 347    def groups_to_remove(self, x: Dict[int, List[str]]) -> None:
 348        """
 349        Update `groups_to_remove`.
 350        """
 351        self.__groups_to_remove = x
 352
 353    @property
 354    def min_percent_snps(self) -> float:
 355        """
 356        Retrieve `min_percent_snps`.
 357        
 358        Returns:
 359            **float:** 
 360                Minimum percentage of SNPs that must be known for an individual to be included in the analysis.
 361                All individuals with fewer percent of unmasked SNPs than this threshold will be excluded.
 362        """
 363        return self.__min_percent_snps
 364
 365    @min_percent_snps.setter
 366    def min_percent_snps(self, x: float) -> None:
 367        """
 368        Update `min_percent_snps`.
 369        """
 370        self.__min_percent_snps = x
 371
 372    @property
 373    def group_snp_frequencies_only(self) -> bool:
 374        """
 375        Retrieve `group_snp_frequencies_only`.
 376        
 377        Returns:
 378            **bool:** 
 379                If True, mdPCA is performed exclusively on group-level SNP frequencies, ignoring individual-level data. This applies 
 380                when `is_weighted` is set to True and a `combination` column is provided in the `labels_file`,  meaning individuals are 
 381                aggregated into groups based on their assigned labels. If False, mdPCA is performed on individual-level SNP data alone 
 382                or on both individual-level and group-level SNP frequencies when `is_weighted` is True and a `combination` column is provided.
 383        """
 384        return self.__group_snp_frequencies_only
 385
 386    @group_snp_frequencies_only.setter
 387    def group_snp_frequencies_only(self, x: bool) -> None:
 388        """
 389        Update `group_snp_frequencies_only`.
 390        """
 391        self.__group_snp_frequencies_only = x
 392
 393    @property
 394    def save_masks(self) -> bool:
 395        """
 396        Retrieve `save_masks`.
 397        
 398        Returns:
 399            **bool:** True if the masked matrices are to be saved in a `.npz` file, or False otherwise.
 400        """
 401        return self.__save_masks
 402
 403    @save_masks.setter
 404    def save_masks(self, x: bool) -> None:
 405        """
 406        Update `save_masks`.
 407        """
 408        self.__save_masks = x
 409
 410    @property
 411    def load_masks(self) -> bool:
 412        """
 413        Retrieve `load_masks`.
 414        
 415        Returns:
 416            **bool:** 
 417                True if the masked matrices are to be loaded from a pre-existing `.npz` file specified 
 418                by `masks_file`, or False otherwise.
 419        """
 420        return self.__load_masks
 421
 422    @load_masks.setter
 423    def load_masks(self, x: bool) -> None:
 424        """
 425        Update `load_masks`.
 426        """
 427        self.__load_masks = x
 428
 429    @property
 430    def masks_file(self) -> Union[str, pathlib.Path]:
 431        """
 432        Retrieve `masks_file`.
 433        
 434        Returns:
 435            **str or pathlib.Path:** Path to the `.npz` file used for saving/loading masked matrices.
 436        """
 437        return self.__masks_file
 438
 439    @masks_file.setter
 440    def masks_file(self, x: Union[str, pathlib.Path]) -> None:
 441        """
 442        Update `masks_file`.
 443        """
 444        self.__masks_file = x
 445
 446    @property
 447    def output_file(self) -> Union[str, pathlib.Path]:
 448        """
 449        Retrieve `output_file`.
 450        
 451        Returns:
 452            **str or pathlib.Path:** Path to the output `.tsv` file where mdPCA results are saved.
 453        """
 454        return self.__output_file
 455
 456    @output_file.setter
 457    def output_file(self, x: Union[str, pathlib.Path]) -> None:
 458        """
 459        Update `output_file`.
 460        """
 461        self.__output_file = x
 462
 463    @property
 464    def covariance_matrix_file(self) -> Optional[str]:
 465        """
 466        Retrieve `covariance_matrix_file`.
 467        
 468        Returns:
 469            **str:** Path to save the covariance matrix file in `.npy` format.
 470        """
 471        return self.__covariance_matrix_file
 472    
 473    @covariance_matrix_file.setter
 474    def covariance_matrix_file(self, x: Optional[str]) -> None:
 475        """
 476        Update `covariance_matrix_file`.
 477        """
 478        self.__covariance_matrix_file = x
 479
 480    @property
 481    def n_components(self) -> int:
 482        """
 483        Retrieve `n_components`.
 484        
 485        Returns:
 486            **int:** The number of principal components.
 487        """
 488        return self.__n_components
 489
 490    @n_components.setter
 491    def n_components(self, x: int) -> None:
 492        """
 493        Update `n_components`.
 494        """
 495        self.__n_components = x
 496
 497    @property
 498    def rsid_or_chrompos(self) -> int:
 499        """
 500        Retrieve `rsid_or_chrompos`.
 501        
 502        Returns:
 503            **int:** Format indicator for SNP IDs in the SNP data. Use 1 for `rsID` format or 2 for `chromosome_position`.
 504        """
 505        return self.__rsid_or_chrompos
 506
 507    @rsid_or_chrompos.setter
 508    def rsid_or_chrompos(self, x: int) -> None:
 509        """
 510        Update `rsid_or_chrompos`.
 511        """
 512        self.__rsid_or_chrompos = x
 513
 514    @property
 515    def percent_vals_masked(self) -> float:
 516        """
 517        Retrieve `percent_vals_masked`.
 518        
 519        Returns:
 520            **float:** 
 521                Percentage of values in the covariance matrix to be masked and then imputed. Only applicable if `method` is 
 522                `'cov_matrix_imputation'` or `'cov_matrix_imputation_ils'`.
 523        """
 524        return self.__percent_vals_masked
 525
 526    @percent_vals_masked.setter
 527    def percent_vals_masked(self, x: float) -> None:
 528        """
 529        Update `percent_vals_masked`.
 530        """
 531        self.__percent_vals_masked = x
 532
 533    @property
 534    def X_new_(self) -> Optional[np.ndarray]:
 535        """
 536        Retrieve `X_new_`.
 537
 538        Returns:
 539            **array of shape (n_samples, n_components):** 
 540                The transformed SNP data projected onto the `n_components` principal components.
 541                n_haplotypes_ is the number of haplotypes, potentially reduced if filtering is applied 
 542                (`min_percent_snps > 0`). For diploid individuals without filtering, the shape is 
 543                `(n_samples * 2, n_components)`.
 544        """
 545        return self.__X_new_
 546
 547    @X_new_.setter
 548    def X_new_(self, x: np.ndarray) -> None:
 549        """
 550        Update `X_new_`.
 551        """
 552        self.__X_new_ = x
 553
 554    @property
 555    def haplotypes_(self) -> Optional[List[str]]:
 556        """
 557        Retrieve `haplotypes_`.
 558
 559        Returns:
 560            list of str:
 561                A list of unique haplotype identifiers.
 562        """
 563        if isinstance(self.__haplotypes_, np.ndarray):
 564            return self.__haplotypes_.ravel().tolist()  # Flatten and convert NumPy array to a list
 565        elif isinstance(self.__haplotypes_, list):
 566            if len(self.__haplotypes_) == 1 and isinstance(self.__haplotypes_[0], np.ndarray):
 567                return self.__haplotypes_[0].ravel().tolist()  # Handle list containing a single array
 568            return self.__haplotypes_  # Already a flat list
 569        elif self.__haplotypes_ is None:
 570            return None  # If no haplotypes are set
 571        else:
 572            raise TypeError("`haplotypes_` must be a list or a NumPy array.")
 573
 574    @haplotypes_.setter
 575    def haplotypes_(self, x: Union[np.ndarray, List[str]]) -> None:
 576        """
 577        Update `haplotypes_`.
 578        """
 579        if isinstance(x, np.ndarray):
 580            self.__haplotypes_ = x.ravel().tolist()  # Flatten and convert to a list
 581        elif isinstance(x, list):
 582            if len(x) == 1 and isinstance(x[0], np.ndarray):  # Handle list containing a single array
 583                self.__haplotypes_ = x[0].ravel().tolist()
 584            else:
 585                self.__haplotypes_ = x  # Use directly if already a list
 586        else:
 587            raise TypeError("`x` must be a list or a NumPy array.")
 588
 589    @property
 590    def samples_(self) -> Optional[List[str]]:
 591        """
 592        Retrieve `samples_`.
 593
 594        Returns:
 595            list of str:
 596                A list of sample identifiers based on `haplotypes_` and `average_strands`.
 597        """
 598        haplotypes = self.haplotypes_
 599        if haplotypes is None:
 600            return None
 601        if self.__average_strands:
 602            return haplotypes
 603        else:
 604            return [x[:-2] for x in haplotypes]
 605
 606    @property
 607    def variants_id_(self) -> Optional[np.ndarray]:
 608        """
 609        Retrieve `variants_id_`.
 610
 611        Returns:
 612            **array of shape (n_snp,):** 
 613                An array containing unique identifiers (IDs) for each SNP,
 614                potentially reduced if there are SNPs not present in the `laiobj`.
 615                The format will depend on `rsid_or_chrompos`.
 616        """
 617        return self.__variants_id_
 618
 619    @variants_id_.setter
 620    def variants_id_(self, x: np.ndarray) -> None:
 621        """
 622        Update `variants_id_`.
 623        """
 624        self.__variants_id_ = x
 625
 626    @property
 627    def n_haplotypes(self) -> Optional[int]:
 628        """
 629        Retrieve `n_haplotypes`.
 630
 631        Returns:
 632            **int:**
 633                The total number of haplotypes, potentially reduced if filtering is applied 
 634                (`min_percent_snps > 0`).
 635        """
 636        return len(self.haplotypes_)
 637
 638    @property
 639    def n_samples(self) -> Optional[int]:
 640        """
 641        Retrieve `n_samples`.
 642
 643        Returns:
 644            **int:**
 645                The total number of samples, potentially reduced if filtering is applied 
 646                (`min_percent_snps > 0`).
 647        """
 648        return len(np.unique(self.samples_))
 649
 650    def copy(self) -> 'mdPCA':
 651        """
 652        Create and return a copy of `self`.
 653
 654        Returns:
 655            **mdPCA:** 
 656                A new instance of the current object.
 657        """
 658        return copy.copy(self)
 659
 660    @staticmethod
 661    def _define_ancestry(ancestry, ancestry_map):
 662        """
 663        Determine the ancestry index based on different input types.
 664
 665        Args:
 666            ancestry (int or str): The ancestry input, which can be:
 667                - An integer (e.g., 0, 1, 2).
 668                - A string representation of an integer (e.g., '0', '1').
 669                - A string matching one of the ancestry map values (e.g., 'Africa').
 670            ancestry_map (dict): A dictionary mapping ancestry indices (as strings) to ancestry names.
 671
 672        Returns:
 673            int: The corresponding ancestry index.
 674        """
 675        if isinstance(ancestry, int):  
 676            return ancestry  
 677        elif isinstance(ancestry, str) and ancestry.isdigit():  
 678            return int(ancestry)  
 679        elif ancestry in ancestry_map.values():  
 680            return int(next(key for key, value in ancestry_map.items() if value == ancestry))  
 681        else:  
 682            raise ValueError(f"Invalid ancestry input: {ancestry}")
 683
 684    def _load_mask_file(self):
 685        """
 686        Load previously saved masked genotype data from an `.npz` file.
 687        """
 688        mask_files = np.load(self.masks_file, allow_pickle=True)
 689        mask = mask_files['mask']
 690        rs_ID_list = mask_files['rs_ID_list']
 691        ind_ID_list = mask_files['ind_ID_list']
 692        labels = mask_files['labels']
 693        weights = mask_files['weights']
 694        return mask, rs_ID_list, ind_ID_list, labels, weights
 695
 696    @staticmethod
 697    def _compute_strength_vector(X):
 698        strength_vector = np.sum(~np.isnan(X), axis=1) / X.shape[1]
 699        return strength_vector
 700
 701    @staticmethod
 702    def _compute_strength_matrix(X):
 703        notmask = (~np.isnan(X)).astype(np.float32)
 704        strength_matrix = np.dot(notmask, notmask.T)
 705        strength_matrix /= X.shape[1]
 706        return strength_matrix
 707
 708    def _cov(self, x):
 709        ddof = 1
 710        x = np.ma.array(x, ndmin=2, copy=True, dtype=np.float32)
 711        xmask = np.ma.getmaskarray(x)
 712        xnotmask = np.logical_not(xmask).astype(np.float32)
 713        fact = np.dot(xnotmask, xnotmask.T) * 1. - ddof
 714        del xnotmask
 715        gc.collect()
 716        result = (np.ma.dot(x, x.T, strict=False) / fact).squeeze()
 717        x = x.data
 718        strength_vec = self._compute_strength_vector(x)
 719        strength_mat = self._compute_strength_matrix(x)
 720        return result.data, strength_vec, strength_mat
 721
 722    @staticmethod
 723    def _demean(S, w):
 724        w_sum = np.matmul(w, S)
 725        w_rowsum = w_sum.reshape((1, S.shape[0]))
 726        w_colsum = w_sum.reshape((S.shape[0], 1))
 727        w_totalsum = np.dot(w_sum, w)
 728        S -= (w_rowsum + w_colsum) - w_totalsum
 729        return S
 730
 731    @staticmethod
 732    def _iterative_svd_matrix_completion(cov):
 733        num_masked = np.isnan(cov).sum()
 734        if num_masked > 0:
 735            start_time = time.time()
 736            start_rank = 1
 737            end_rank = 10
 738            rank = 5
 739            choose_best = True
 740            num_cores = 5
 741            cov_complete = IterativeSVD(start_rank=start_rank, end_rank=end_rank, rank=rank, choose_best=choose_best, num_cores=num_cores).fit_transform(cov)
 742            logging.info("Iterative SVD --- %s seconds ---" % (time.time() - start_time))
 743        else:
 744            cov_complete = cov
 745        return cov_complete
 746
 747    def _weighted_cov_pca(self, X_incomplete, weights, covariance_matrix_file, n_components):
 748        start_time = time.time()
 749        X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete))
 750        S, _, _ = self._cov(X_incomplete)
 751        logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time))
 752        weights_normalized = weights / weights.sum()
 753        S = self._demean(S, weights_normalized)
 754        W = np.diag(weights)
 755        WSW = np.matmul(np.matmul(np.sqrt(W), S), np.sqrt(W))
 756        if covariance_matrix_file:
 757            np.save(covariance_matrix_file, S.data)
 758        svd = TruncatedSVD(n_components, algorithm="arpack")
 759        svd.fit(WSW)
 760        U = np.matmul(svd.components_, np.linalg.inv(np.sqrt(W))).T
 761        D = np.diag(np.sqrt(svd.singular_values_))
 762        T = np.matmul(U, D)
 763        total_var = np.trace(S)
 764        for i in range(n_components):
 765            pc_percentvar = 100 * svd.singular_values_[i] / total_var
 766            logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar)
 767        return T
 768
 769    @staticmethod
 770    def _create_validation_mask(X_incomplete, percent_inds):
 771        np.random.seed(1)
 772        masked_rows = np.isnan(X_incomplete).any(axis=1)
 773        masked_inds = np.flatnonzero(masked_rows)
 774        X_masked = X_incomplete[masked_rows] 
 775        percent_masked = 100 * np.isnan(X_masked).sum() / (X_masked.shape[0] * X_masked.shape[1])
 776        unmasked_rows = ~masked_rows
 777        X_unmasked = X_incomplete[unmasked_rows]
 778        masked_rows = np.random.choice(range(X_unmasked.shape[0]), size=int(X_unmasked.shape[0] * percent_inds / 100), replace=False)
 779        X_masked_rows = X_unmasked[masked_rows,:]
 780        mask = np.zeros(X_masked_rows.shape[0] * X_masked_rows.shape[1], dtype=np.int8)
 781        mask[:int(X_masked_rows.shape[0] * X_masked_rows.shape[1] * percent_masked / 100)] = 1
 782        np.random.shuffle(mask)
 783        mask = mask.astype(bool)
 784        mask = mask.reshape(X_masked_rows.shape)
 785        X_masked_rows[mask] = np.nan
 786        X_unmasked[masked_rows] = X_masked_rows
 787        X_incomplete[unmasked_rows] = X_unmasked
 788        masked_rows_new = np.isnan(X_incomplete).any(axis=1)
 789        masked_inds_new = np.flatnonzero(masked_rows_new)
 790        masked_inds_val = sorted(list(set(masked_inds_new) - set(masked_inds)))
 791        return X_incomplete, masked_inds_val
 792
 793    @staticmethod    
 794    def _create_cov_mask(cov, strength_mat, percent_vals_masked):
 795        cov_flattened = cov.reshape(-1).copy()
 796        num_vals_masked = int(percent_vals_masked * len(cov_flattened) / 100)
 797        pos_masked = strength_mat.reshape(-1).argsort()[:num_vals_masked]
 798        cov_flattened[pos_masked] = np.nan
 799        cov_masked = cov_flattened.reshape(cov.shape)
 800        return cov_masked
 801
 802    def _regularized_optimization_ils(self, X_incomplete, covariance_matrix_file, n_components):
 803        def run_cov_matrix_regularized_optimization_ils(X_incomplete, covariance_matrix_file):
 804            start_time = time.time()
 805            X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete))
 806            has_missing = np.isnan(X_incomplete.data).sum() > 0
 807            S, _, strength_mat = self._cov(X_incomplete)
 808            logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time))
 809            S = self._demean(S, np.ones(S.shape[0]) / S.shape[0])
 810            if has_missing: 
 811                logging.info("Starting matrix completion. This will take a few minutes...")
 812                start_time = time.time()
 813                percent_inds_val = 20 # Percent of unmasked individuals to be masked for cross-validation 
 814                X_incomplete, masked_inds_val = self._create_validation_mask(X_incomplete.data, percent_inds_val) # masked_inds_val is the list of indices of the individuals masked for validation
 815                X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete))
 816                S_prime, _, w_mat_prime = self._cov(X_incomplete)
 817                del X_incomplete
 818                gc.collect()
 819                S_prime = self._demean(S_prime, np.ones(S_prime.shape[0]) / S_prime.shape[0])
 820                S_robust, lam = self.matrix_completion(S, strength_mat, S_prime, w_mat_prime, lams=None, method="NN", 
 821                                                cv_inds=masked_inds_val)
 822                logging.info(f"Covariance Matrix --- %{time.time() - start_time:.2}s seconds ---")
 823            else:
 824                S_robust = S.copy()
 825            if covariance_matrix_file:
 826                np.save(covariance_matrix_file, S.data)
 827                if has_missing:
 828                    base, ext = os.path.splitext(covariance_matrix_file)
 829                    np.save(f"{base}_completed_{lam}{ext}", S_robust.data)
 830            return S, S_robust
 831
 832        def _compute_projection_regularized_optimization_ils(X_incomplete, S, S_robust, n_components):
 833            _, _, strength_mat = self._cov(np.ma.array(X_incomplete, mask=np.isnan(X_incomplete)))
 834            nonmissing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) == 0)
 835            missing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) > 0)
 836            missing_sorted = sorted(missing, key=lambda x: (~np.isnan(X_incomplete[x])).sum(), reverse=True)
 837            svd = TruncatedSVD(n_components, algorithm="arpack")
 838            svd.fit(S_robust)
 839            U = svd.components_.T
 840            U_nonmissing = U[nonmissing]
 841            D_nonmissing = np.diag(np.sqrt(svd.singular_values_))
 842            T = np.zeros((X_incomplete.shape[0], n_components))
 843            T_filled = np.matmul(U_nonmissing, D_nonmissing)
 844            i_filled = nonmissing
 845            T[nonmissing] = T_filled
 846            for i in missing_sorted:
 847                S_i = S[i][i_filled]
 848                W_i = np.diag(strength_mat[i][i_filled])
 849                A = np.matmul(W_i, T_filled)
 850                b = np.matmul(W_i, S_i)
 851                t = np.linalg.lstsq(A, b, rcond=None)[0]
 852                T[i] = t
 853                i_filled = np.append(i_filled, i)
 854                T_filled = np.append(T_filled, [t], axis=0)
 855            total_var = np.trace(S)
 856            for i in range(n_components):
 857                pc_percentvar = 100 * svd.singular_values_[i] / total_var
 858                logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar)
 859            return T
 860        
 861        S, S_robust = run_cov_matrix_regularized_optimization_ils(X_incomplete, covariance_matrix_file)
 862        T = _compute_projection_regularized_optimization_ils(X_incomplete, S, S_robust, n_components)
 863        return T
 864
 865    def _cov_matrix_imputation(self, X_incomplete, percent_vals_masked, covariance_matrix_file, n_components):
 866        start_time = time.time()
 867        X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete))
 868        S, strength_vec, strength_mat = self._cov(X_incomplete)
 869        logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time))
 870        S_masked = self._create_cov_mask(S, strength_mat, percent_vals_masked)
 871        S = self._iterative_svd_matrix_completion(S_masked)
 872        weights = np.ones(len(strength_vec))
 873        weights_normalized = weights / weights.sum()
 874        S = self._demean(S, weights_normalized)
 875        W = np.diag(weights)
 876        WSW = np.matmul(np.matmul(np.sqrt(W), S), np.sqrt(W))
 877        if covariance_matrix_file:
 878            np.save(covariance_matrix_file, S.data)
 879        svd = TruncatedSVD(n_components, algorithm="arpack")
 880        svd.fit(WSW)
 881        U = np.matmul(svd.components_, np.linalg.inv(np.sqrt(W))).T
 882        D = np.diag(np.sqrt(svd.singular_values_))
 883        T = np.matmul(U, D)
 884        total_var = np.trace(S)
 885        for i in range(n_components):
 886            pc_percentvar = 100 * svd.singular_values_[i] / total_var
 887            logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar)
 888        return T
 889
 890    def _cov_matrix_imputation_ils(self, X_incomplete, percent_vals_masked, covariance_matrix_file, n_components):
 891        
 892        def run_cov_matrix_cov_matrix_imputation_ils(X_incomplete, covariance_matrix_file):
 893            start_time = time.time()
 894            X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete))
 895            S, strength_vec, strength_mat = self._cov(X_incomplete)
 896            logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time))
 897            S_masked = self._create_cov_mask(S, strength_mat, percent_vals_masked)
 898            S = self._iterative_svd_matrix_completion(S_masked)
 899            weights = np.ones(len(strength_vec))
 900            weights_normalized = weights / weights.sum()
 901            S = self._demean(S, weights_normalized)
 902            W = np.diag(weights)
 903            if covariance_matrix_file:
 904                np.save(covariance_matrix_file, S.data)
 905            return S, W
 906
 907        def compute_projection_cov_matrix_imputation_ils(X_incomplete, S, W, n_components):
 908            S, _, strength_mat = self._cov(np.ma.array(X_incomplete, mask=np.isnan(X_incomplete)))
 909            S = self._demean(S, np.ones(S.shape[0]) / S.shape[0])
 910            nonmissing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) == 0)
 911            missing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) > 0)
 912            missing_sorted = sorted(missing, key=lambda x: (~np.isnan(X_incomplete[x])).sum(), reverse=True)
 913            svd = TruncatedSVD(n_components, algorithm="arpack")
 914            WSW = np.matmul(np.matmul(np.sqrt(W), S), np.sqrt(W))
 915            svd.fit(WSW)
 916            U = np.matmul(svd.components_, np.linalg.inv(np.sqrt(W))).T
 917            U_nonmissing = U[nonmissing]
 918            D_nonmissing = np.diag(np.sqrt(svd.singular_values_))
 919            T = np.zeros((X_incomplete.shape[0], n_components))
 920            T_filled = np.matmul(U_nonmissing, D_nonmissing)
 921            i_filled = nonmissing
 922            T[nonmissing] = T_filled
 923            for i in missing_sorted:
 924                S_i = S[i][i_filled]
 925                W_i = np.diag(strength_mat[i][i_filled])
 926                A = np.matmul(W_i, T_filled)
 927                b = np.matmul(W_i, S_i)
 928                t = np.linalg.lstsq(A, b, rcond=None)[0]
 929                T[i] = t
 930                i_filled = np.append(i_filled, i)
 931                T_filled = np.append(T_filled, [t], axis=0)
 932            total_var = np.trace(S)
 933            for i in range(n_components):
 934                pc_percentvar = 100 * svd.singular_values_[i] / total_var
 935                logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar)
 936            return T
 937        
 938        S, W = run_cov_matrix_cov_matrix_imputation_ils(X_incomplete, covariance_matrix_file)
 939        T = compute_projection_cov_matrix_imputation_ils(X_incomplete, S, W, n_components)
 940        return T
 941
 942    def _nonmissing_pca_ils(self, X_incomplete, covariance_matrix_file, n_components):
 943    
 944        def run_cov_matrix_nonmissing_pca_ils(X_incomplete, covariance_matrix_file):
 945            start_time = time.time()
 946            nonmissing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) == 0)
 947            X_incomplete_nonmissing = X_incomplete[nonmissing]
 948            X_incomplete_nonmissing = np.ma.array(X_incomplete_nonmissing, mask=np.isnan(X_incomplete_nonmissing))
 949            S_nonmissing, _, _ = self._cov(X_incomplete_nonmissing)
 950            logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time))
 951            S_nonmissing = self._demean(S_nonmissing, np.ones(S_nonmissing.shape[0]) / S_nonmissing.shape[0])    
 952            if covariance_matrix_file:
 953                np.save(covariance_matrix_file, S_nonmissing.data)
 954            return S_nonmissing
 955        
 956        def compute_projection_nonmissing_pca_ils(X_incomplete, S_nonmissing, n_components):
 957            S, _, strength_mat = self._cov(np.ma.array(X_incomplete, mask=np.isnan(X_incomplete)))
 958            S = self._demean(S, np.ones(S.shape[0]) / S.shape[0])
 959            nonmissing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) == 0)
 960            missing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) > 0)
 961            missing_sorted = sorted(missing, key=lambda x: (~np.isnan(X_incomplete[x])).sum(), reverse=True)
 962            svd = TruncatedSVD(n_components, algorithm="arpack")
 963            svd.fit(S_nonmissing)
 964            U_nonmissing = svd.components_.T
 965            D_nonmissing = np.diag(np.sqrt(svd.singular_values_))
 966            T = np.zeros((X_incomplete.shape[0], n_components))
 967            T_filled = np.matmul(U_nonmissing, D_nonmissing)
 968            i_filled = nonmissing
 969            T[nonmissing] = T_filled
 970            for i in missing_sorted:
 971                S_i = S[i][i_filled]
 972                W_i = np.diag(strength_mat[i][i_filled])
 973                A = np.matmul(W_i, T_filled)
 974                b = np.matmul(W_i, S_i)
 975                t = np.linalg.lstsq(A, b, rcond=None)[0]
 976                T[i] = t
 977                i_filled = np.append(i_filled, i)
 978                T_filled = np.append(T_filled, [t], axis=0)
 979            total_var = np.trace(S)
 980            for i in range(n_components):
 981                pc_percentvar = 100 * svd.singular_values_[i] / total_var
 982                logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar)
 983            return T
 984        
 985        S_nonmissing = run_cov_matrix_nonmissing_pca_ils(X_incomplete, covariance_matrix_file)
 986        T = compute_projection_nonmissing_pca_ils(X_incomplete, S_nonmissing, n_components)
 987        return T
 988
 989    def _run_cov_matrix(self, X_incomplete, weights):
 990        """
 991        Runs the specified PCA method on the incomplete data.
 992        """
 993        if self.method == "weighted_cov_pca":
 994            return self._weighted_cov_pca(X_incomplete, weights, self.covariance_matrix_file, self.n_components)
 995        elif self.method == "regularized_optimization_ils":
 996            return self._regularized_optimization_ils(X_incomplete, self.covariance_matrix_file, self.n_components)
 997        elif self.method == "cov_matrix_imputation":
 998            return self._cov_matrix_imputation(X_incomplete, self.percent_vals_masked, self.covariance_matrix_file, self.n_components)
 999        elif self.method == "cov_matrix_imputation_ils":
1000            return self._cov_matrix_imputation_ils(X_incomplete, self.percent_vals_masked, self.covariance_matrix_file, self.n_components)
1001        elif self.method == "nonmissing_pca_ils":
1002            return self._nonmissing_pca_ils(X_incomplete, self.covariance_matrix_file, self.n_components)
1003        else:
1004            raise ValueError(f"Unsupported method: {self.method}")
1005
1006    def fit_transform(
1007            self,
1008            snpobj: Optional['SNPObject'] = None, 
1009            laiobj: Optional['LocalAncestryObject'] = None,
1010            labels_file: Optional[str] = None,
1011            ancestry: Optional[str] = None,
1012            average_strands: Optional[bool] = None
1013        ) -> np.ndarray:
1014        """
1015        Fit the model to the SNP data stored in the provided `snpobj` and apply the dimensionality reduction on the same SNP data.
1016        
1017        This method starts by loading or updating SNP and ancestry data. Then, it manages missing values by applying 
1018        masks based on ancestry, either by loading a pre-existing mask or generating new ones. After processing these 
1019        masks to produce an incomplete SNP data matrix, it applies the chosen mdPCA method to reduce dimensionality 
1020        while handling missing data as specified.
1021
1022        Args:
1023            snpobj (SNPObject, optional): 
1024                A SNPObject instance.
1025            laiobj (LAIObject, optional): 
1026                A LAIObject instance.
1027            labels_file (str, optional): 
1028                Path to the labels file in .tsv format. The first column, `indID`, contains the individual identifiers, and the second 
1029                column, `label`, specifies the groups for all individuals. If `is_weighted=True`, a `weight` column with individual 
1030                weights is required. Optionally, `combination` and `combination_weight` columns can specify sets of individuals to be 
1031                combined into groups, with respective weights.
1032            ancestry (str, optional): 
1033                Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0.
1034            average_strands (bool, optional): 
1035                True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
1036                If None, defaults to `self.average_strands`.
1037
1038        Returns:
1039            **array of shape (n_samples, n_components):** 
1040                The transformed SNP data projected onto the `n_components` principal components, stored in `self.X_new_`.
1041        """
1042        if snpobj is None:
1043            snpobj = self.snpobj
1044        if laiobj is None:
1045            laiobj = self.laiobj
1046        if labels_file is None:
1047            labels_file = self.labels_file
1048        if ancestry is None:
1049            ancestry = self.ancestry
1050        if average_strands is None:
1051            average_strands = self.average_strands
1052        
1053        if self.load_masks:
1054            # Load precomputed ancestry-based masked genotype matrix, SNP identifiers, haplotype identifiers, and weights
1055            mask, variants_id, haplotypes, _, weights = self._load_mask_file()
1056        else:
1057            # Process genotype data with optional ancestry-based masking and return the corresponding SNP and individual identifiers
1058            mask, variants_id, haplotypes = process_calldata_gt(
1059                self.snpobj,
1060                self.laiobj,
1061                self.ancestry,
1062                self.average_strands,
1063                self.force_nan_incomplete_strands,
1064                self.is_masked,
1065                self.rsid_or_chrompos
1066            )
1067
1068            # Process individual genomic labels and weights, aligning them with a masked genotype matrix by 
1069            # filtering out low-coverage individuals, reordering data to match the matrix structure, and 
1070            # handling group-based adjustments
1071            mask, haplotypes, _, weights = process_labels_weights(
1072                self.labels_file,
1073                mask,
1074                variants_id,
1075                haplotypes,
1076                self.average_strands,
1077                self.ancestry,
1078                self.min_percent_snps,
1079                self.group_snp_frequencies_only,
1080                self.groups_to_remove,
1081                self.is_weighted,
1082                self.save_masks,
1083                self.masks_file
1084            )
1085
1086        # Call run_cov_matrix with the specified method
1087        self.X_new_ = self._run_cov_matrix(
1088            mask[self.ancestry].T,
1089            weights
1090        )
1091
1092        self.haplotypes_ = haplotypes
1093        self.variants_id_ = variants_id
1094
1095        return self.X_new_

A class for performing missing data principal component analysis (mdPCA) on SNP data.

The mdPCA class focuses on genotype segments from the ancestry of interest when the is_masked flag is set to True. It offers flexible processing options, allowing either separate handling of masked haplotype strands or combining (averaging) strands into a single composite representation for each individual. Moreover, the analysis can be performed on individual-level data, group-level SNP frequencies, or a combination of both.

If the snpobj, laiobj, labels_file, and ancestry parameters are all provided during instantiation, the fit_transform method will be automatically called, applying the specified mdPCA method to transform the data upon instantiation.

mdPCA( method: str = 'weighted_cov_pca', snpobj: Optional[snputils.snp.genobj.SNPObject] = None, laiobj: Optional[snputils.ancestry.genobj.LocalAncestryObject] = None, labels_file: Optional[str] = None, ancestry: Union[int, str, NoneType] = None, is_masked: bool = True, average_strands: bool = False, force_nan_incomplete_strands: bool = False, is_weighted: bool = False, groups_to_remove: List[str] = None, min_percent_snps: float = 4, group_snp_frequencies_only: bool = True, save_masks: bool = False, load_masks: bool = False, masks_file: Union[str, pathlib._local.Path] = 'masks.npz', output_file: Union[str, pathlib._local.Path] = 'output.tsv', covariance_matrix_file: Optional[str] = None, n_components: int = 2, rsid_or_chrompos: int = 2, percent_vals_masked: float = 0)
 31    def __init__(
 32        self,
 33        method: str = 'weighted_cov_pca',
 34        snpobj: Optional['SNPObject'] = None,
 35        laiobj: Optional['LocalAncestryObject'] = None,
 36        labels_file: Optional[str] = None,
 37        ancestry: Optional[Union[int, str]] = None,
 38        is_masked: bool = True,
 39        average_strands: bool = False,
 40        force_nan_incomplete_strands: bool = False,
 41        is_weighted: bool = False,
 42        groups_to_remove: List[str] = None,
 43        min_percent_snps: float = 4,
 44        group_snp_frequencies_only: bool = True,
 45        save_masks: bool = False,
 46        load_masks: bool = False,
 47        masks_file: Union[str, pathlib.Path] = 'masks.npz',
 48        output_file: Union[str, pathlib.Path] = 'output.tsv',
 49        covariance_matrix_file: Optional[str] = None,
 50        n_components: int = 2,
 51        rsid_or_chrompos: int = 2,
 52        percent_vals_masked: float = 0
 53    ):
 54        """
 55        Args:
 56            method (str, default='weighted_cov_pca'): 
 57                The PCA method to use for dimensionality reduction. Options include:
 58                - `'weighted_cov_pca'`: 
 59                    Simple covariance-based PCA, weighted by sample strengths.
 60                - `'regularized_optimization_ils'`: 
 61                    Regularized optimization followed by iterative, weighted (via the strengths) least squares projection of 
 62                    missing samples using the original covariance matrix (considering only relevant elements not missing in 
 63                    the original covariance matrix for those samples).
 64                - `'cov_matrix_imputation'`: 
 65                    Eigen-decomposition of the covariance matrix after first imputing the covariance matrix missing values 
 66                    using the Iterative SVD imputation method.
 67                - `'cov_matrix_imputation_ils'`: 
 68                    The method of 'cov_matrix_imputation', but where afterwards missing samples are re-projected onto the space 
 69                    given by 'cov_matrix_imputation' using the same iterative method on the original covariance matrix just 
 70                    as done in 'regularized_optimization_ils'.
 71                - `'nonmissing_pca_ils'`: 
 72                    The method of 'weighted_cov_pca' on the non-missing samples, followed by the projection of missing samples onto 
 73                    the space given by 'weighted_cov_pca' using the same iterative method on the original covariance matrix just as 
 74                    done in 'regularized_optimization_ils'.
 75            snpobj (SNPObject, optional): 
 76                A SNPObject instance.
 77            laiobj (LAIObject, optional): 
 78                A LAIObject instance.
 79            labels_file (str, optional): 
 80                Path to a `.tsv` file with metadata on individuals, including population labels, optional weights, and groupings. 
 81                The `indID` column must contain unique individual identifiers matching those in `laiobj` and `snpobj` for proper alignment. 
 82                The `label` column assigns population groups. If `is_weighted=True`, a `weight` column must be provided, assigning a weight to 
 83                each individual, where those with a weight of zero are removed. Optional columns include `combination` and `combination_weight` 
 84                to aggregate individuals into combined groups, where SNP frequencies represent their sequences. The `combination` column assigns 
 85                each individual to a specific group (0 for no combination, 1 for the first group, 2 for the second, etc.). All members of a group 
 86                must share the same `label` and `combination_weight`. If `combination_weight` column is not provided, the combinations are 
 87                assigned a default weight of `1`. Individuals excluded via `groups_to_remove` or those falling below `min_percent_snps` are removed 
 88                from the analysis.
 89            ancestry (int or str, optional): 
 90                Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at `0`. The ancestry input can be:
 91                - An integer (e.g., 0, 1, 2).
 92                - A string representation of an integer (e.g., '0', '1').
 93                - A string matching one of the ancestry map values (e.g., 'Africa').
 94            is_masked (bool, default=True): 
 95                If `True`, applies ancestry-specific masking to the genotype matrix, retaining only genotype data 
 96                corresponding to the specified `ancestry`. If `False`, uses the full, unmasked genotype matrix.
 97            average_strands (bool, default=False): 
 98                True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
 99            force_nan_incomplete_strands (bool): 
100                If `True`, sets the result to NaN if either haplotype in a pair is NaN. 
101                Otherwise, computes the mean while ignoring NaNs (e.g., 0|NaN -> 0, 1|NaN -> 1).
102            is_weighted (bool, default=False): 
103                If `True`, assigns individual weights from the `weight` column in `labels_file`. Otherwise, all individuals have equal weight of `1`.
104            groups_to_remove (list of str, optional): 
105                List with groups to exclude from analysis. Example: ['group1', 'group2'].
106            min_percent_snps (float, default=4): 
107                Minimum percentage of SNPs that must be known for an individual and of the ancestry of interest to be included in the analysis.
108                All individuals with fewer percent of unmasked SNPs than this threshold will be excluded.
109            group_snp_frequencies_only (bool, default=True):
110                If True, mdPCA is performed exclusively on group-level SNP frequencies, ignoring individual-level data. This applies when `is_weighted` is 
111                set to True and a `combination` column is provided in the `labels_file`,  meaning individuals are aggregated into groups based on their assigned 
112                labels. If False, mdPCA is performed on individual-level SNP data alone or on both individual-level and group-level SNP frequencies when 
113                `is_weighted` is True and a `combination` column is provided.
114            save_masks (bool, default=False): 
115                True if the masked matrices are to be saved in a `.npz` file, or False otherwise.
116            load_masks (bool, default=False): 
117                True if the masked matrices are to be loaded from a pre-existing `.npz` file specified by `masks_file`, or False otherwise.
118            masks_file (str or pathlib.Path, default='masks.npz'): 
119                Path to the `.npz` file used for saving/loading masked matrices.
120            output_file (str or pathlib.Path, default='output.tsv'): 
121                Path to the output `.tsv` file where mdPCA results are saved.
122            covariance_matrix_file (str, optional): 
123                Path to save the covariance matrix file in `.npy` format. If None, the covariance matrix is not saved. Default is None.
124            n_components (int, default=2): 
125                The number of principal components.
126            rsid_or_chrompos (int, default=2): 
127                Format indicator for SNP IDs in `self.__X_new_`. Use 1 for `rsID` format or 2 for `chromosome_position`.
128            percent_vals_masked (float, default=0): 
129                Percentage of values in the covariance matrix to be masked and then imputed. Only applicable if `method` is 
130                `'cov_matrix_imputation'` or `'cov_matrix_imputation_ils'`.
131        """
132        self.__snpobj = snpobj
133        self.__laiobj = laiobj
134        self.__labels_file = labels_file
135        self.__ancestry = self._define_ancestry(ancestry, laiobj.ancestry_map)
136        self.__method = method
137        self.__is_masked = is_masked
138        self.__average_strands = average_strands
139        self.__force_nan_incomplete_strands = force_nan_incomplete_strands
140        self.__is_weighted = is_weighted
141        self.__groups_to_remove = groups_to_remove
142        self.__min_percent_snps = min_percent_snps
143        self.__group_snp_frequencies_only = group_snp_frequencies_only
144        self.__save_masks = save_masks
145        self.__load_masks = load_masks
146        self.__masks_file = masks_file
147        self.__output_file = output_file
148        self.__covariance_matrix_file = covariance_matrix_file
149        self.__n_components = n_components
150        self.__rsid_or_chrompos = rsid_or_chrompos
151        self.__percent_vals_masked = percent_vals_masked
152        self.__X_new_ = None  # Store transformed SNP data
153        self.__haplotypes_ = None  # Store haplotypes of X_new_ (after filtering if min_percent_snps > 0)
154        self.__samples_ = None  # Store samples of X_new_ (after filtering if min_percent_snps > 0)
155        self.__variants_id_ = None  # Store variants ID (after filtering SNPs not in laiobj)
156
157        # Fit and transform if a `snpobj`, `laiobj`, `labels_file`, and `ancestry` are provided
158        if self.snpobj is not None and self.laiobj is not None and self.labels_file is not None and self.ancestry is not None:
159            self.fit_transform(snpobj, laiobj, labels_file, ancestry)
Arguments:
  • method (str, default='weighted_cov_pca'): The PCA method to use for dimensionality reduction. Options include:
    • 'weighted_cov_pca': Simple covariance-based PCA, weighted by sample strengths.
    • 'regularized_optimization_ils': Regularized optimization followed by iterative, weighted (via the strengths) least squares projection of missing samples using the original covariance matrix (considering only relevant elements not missing in the original covariance matrix for those samples).
    • 'cov_matrix_imputation': Eigen-decomposition of the covariance matrix after first imputing the covariance matrix missing values using the Iterative SVD imputation method.
    • 'cov_matrix_imputation_ils': The method of 'cov_matrix_imputation', but where afterwards missing samples are re-projected onto the space given by 'cov_matrix_imputation' using the same iterative method on the original covariance matrix just as done in 'regularized_optimization_ils'.
    • 'nonmissing_pca_ils': The method of 'weighted_cov_pca' on the non-missing samples, followed by the projection of missing samples onto the space given by 'weighted_cov_pca' using the same iterative method on the original covariance matrix just as done in 'regularized_optimization_ils'.
  • snpobj (SNPObject, optional): A SNPObject instance.
  • laiobj (LAIObject, optional): A LAIObject instance.
  • labels_file (str, optional): Path to a .tsv file with metadata on individuals, including population labels, optional weights, and groupings. The indID column must contain unique individual identifiers matching those in laiobj and snpobj for proper alignment. The label column assigns population groups. If is_weighted=True, a weight column must be provided, assigning a weight to each individual, where those with a weight of zero are removed. Optional columns include combination and combination_weight to aggregate individuals into combined groups, where SNP frequencies represent their sequences. The combination column assigns each individual to a specific group (0 for no combination, 1 for the first group, 2 for the second, etc.). All members of a group must share the same label and combination_weight. If combination_weight column is not provided, the combinations are assigned a default weight of 1. Individuals excluded via groups_to_remove or those falling below min_percent_snps are removed from the analysis.
  • ancestry (int or str, optional): Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0. The ancestry input can be:
    • An integer (e.g., 0, 1, 2).
    • A string representation of an integer (e.g., '0', '1').
    • A string matching one of the ancestry map values (e.g., 'Africa').
  • is_masked (bool, default=True): If True, applies ancestry-specific masking to the genotype matrix, retaining only genotype data corresponding to the specified ancestry. If False, uses the full, unmasked genotype matrix.
  • average_strands (bool, default=False): True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
  • force_nan_incomplete_strands (bool): If True, sets the result to NaN if either haplotype in a pair is NaN. Otherwise, computes the mean while ignoring NaNs (e.g., 0|NaN -> 0, 1|NaN -> 1).
  • is_weighted (bool, default=False): If True, assigns individual weights from the weight column in labels_file. Otherwise, all individuals have equal weight of 1.
  • groups_to_remove (list of str, optional): List with groups to exclude from analysis. Example: ['group1', 'group2'].
  • min_percent_snps (float, default=4): Minimum percentage of SNPs that must be known for an individual and of the ancestry of interest to be included in the analysis. All individuals with fewer percent of unmasked SNPs than this threshold will be excluded.
  • group_snp_frequencies_only (bool, default=True): If True, mdPCA is performed exclusively on group-level SNP frequencies, ignoring individual-level data. This applies when is_weighted is set to True and a combination column is provided in the labels_file, meaning individuals are aggregated into groups based on their assigned labels. If False, mdPCA is performed on individual-level SNP data alone or on both individual-level and group-level SNP frequencies when is_weighted is True and a combination column is provided.
  • save_masks (bool, default=False): True if the masked matrices are to be saved in a .npz file, or False otherwise.
  • load_masks (bool, default=False): True if the masked matrices are to be loaded from a pre-existing .npz file specified by masks_file, or False otherwise.
  • masks_file (str or pathlib.Path, default='masks.npz'): Path to the .npz file used for saving/loading masked matrices.
  • output_file (str or pathlib.Path, default='output.tsv'): Path to the output .tsv file where mdPCA results are saved.
  • covariance_matrix_file (str, optional): Path to save the covariance matrix file in .npy format. If None, the covariance matrix is not saved. Default is None.
  • n_components (int, default=2): The number of principal components.
  • rsid_or_chrompos (int, default=2): Format indicator for SNP IDs in self.__X_new_. Use 1 for rsID format or 2 for chromosome_position.
  • percent_vals_masked (float, default=0): Percentage of values in the covariance matrix to be masked and then imputed. Only applicable if method is 'cov_matrix_imputation' or 'cov_matrix_imputation_ils'.
method: str
181    @property
182    def method(self) -> str:
183        """
184        Retrieve `method`.
185        
186        Returns:
187            **str:** The PCA method to use for dimensionality reduction.
188        """
189        return self.__method

Retrieve method.

Returns:

str: The PCA method to use for dimensionality reduction.

snpobj: Optional[snputils.snp.genobj.SNPObject]
198    @property
199    def snpobj(self) -> Optional['SNPObject']:
200        """
201        Retrieve `snpobj`.
202        
203        Returns:
204            **SNPObject:** A SNPObject instance.
205        """
206        return self.__snpobj

Retrieve snpobj.

Returns:

SNPObject: A SNPObject instance.

laiobj: Optional[snputils.ancestry.genobj.LocalAncestryObject]
215    @property
216    def laiobj(self) -> Optional['LocalAncestryObject']:
217        """
218        Retrieve `laiobj`.
219        
220        Returns:
221            **LocalAncestryObject:** A LocalAncestryObject instance.
222        """
223        return self.__laiobj

Retrieve laiobj.

Returns:

LocalAncestryObject: A LocalAncestryObject instance.

labels_file: Optional[str]
232    @property
233    def labels_file(self) -> Optional[str]:
234        """
235        Retrieve `labels_file`.
236        
237        Returns:
238            **str:** Path to the labels file in `.tsv` format.
239        """
240        return self.__labels_file

Retrieve labels_file.

Returns:

str: Path to the labels file in .tsv format.

ancestry: Optional[str]
249    @property
250    def ancestry(self) -> Optional[str]:
251        """
252        Retrieve `ancestry`.
253        
254        Returns:
255            **str:** Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0.
256        """
257        return self.__ancestry

Retrieve ancestry.

Returns:

str: Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0.

is_masked: bool
266    @property
267    def is_masked(self) -> bool:
268        """
269        Retrieve `is_masked`.
270        
271        Returns:
272            **bool:** True if an ancestry file is passed for ancestry-specific masking, or False otherwise.
273        """
274        return self.__is_masked

Retrieve is_masked.

Returns:

bool: True if an ancestry file is passed for ancestry-specific masking, or False otherwise.

average_strands: bool
283    @property
284    def average_strands(self) -> bool:
285        """
286        Retrieve `average_strands`.
287        
288        Returns:
289            **bool:** True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
290        """
291        return self.__average_strands

Retrieve average_strands.

Returns:

bool: True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.

force_nan_incomplete_strands: bool
300    @property
301    def force_nan_incomplete_strands(self) -> bool:
302        """
303        Retrieve `force_nan_incomplete_strands`.
304        
305        Returns:
306            **bool**: If `True`, sets the result to NaN if either haplotype in a pair is NaN.
307                      Otherwise, computes the mean while ignoring NaNs (e.g., 0|NaN -> 0, 1|NaN -> 1).
308        """
309        return self.__force_nan_incomplete_strands

Retrieve force_nan_incomplete_strands.

Returns:

bool: If True, sets the result to NaN if either haplotype in a pair is NaN. Otherwise, computes the mean while ignoring NaNs (e.g., 0|NaN -> 0, 1|NaN -> 1).

is_weighted: bool
318    @property
319    def is_weighted(self) -> bool:
320        """
321        Retrieve `is_weighted`.
322        
323        Returns:
324            **bool:** True if weights are provided in the labels file, or False otherwise.
325        """
326        return self.__is_weighted

Retrieve is_weighted.

Returns:

bool: True if weights are provided in the labels file, or False otherwise.

groups_to_remove: Dict[int, List[str]]
335    @property
336    def groups_to_remove(self) -> Dict[int, List[str]]:
337        """
338        Retrieve `groups_to_remove`.
339        
340        Returns:
341            **dict of int to list of str:** Dictionary specifying groups to exclude from analysis. Keys are array numbers, and values are 
342                lists of groups to remove for each array. Example: `{1: ['group1', 'group2'], 2: [], 3: ['group3']}`.
343        """
344        return self.__groups_to_remove

Retrieve groups_to_remove.

Returns:

dict of int to list of str: Dictionary specifying groups to exclude from analysis. Keys are array numbers, and values are lists of groups to remove for each array. Example: {1: ['group1', 'group2'], 2: [], 3: ['group3']}.

min_percent_snps: float
353    @property
354    def min_percent_snps(self) -> float:
355        """
356        Retrieve `min_percent_snps`.
357        
358        Returns:
359            **float:** 
360                Minimum percentage of SNPs that must be known for an individual to be included in the analysis.
361                All individuals with fewer percent of unmasked SNPs than this threshold will be excluded.
362        """
363        return self.__min_percent_snps

Retrieve min_percent_snps.

Returns:

float: Minimum percentage of SNPs that must be known for an individual to be included in the analysis. All individuals with fewer percent of unmasked SNPs than this threshold will be excluded.

group_snp_frequencies_only: bool
372    @property
373    def group_snp_frequencies_only(self) -> bool:
374        """
375        Retrieve `group_snp_frequencies_only`.
376        
377        Returns:
378            **bool:** 
379                If True, mdPCA is performed exclusively on group-level SNP frequencies, ignoring individual-level data. This applies 
380                when `is_weighted` is set to True and a `combination` column is provided in the `labels_file`,  meaning individuals are 
381                aggregated into groups based on their assigned labels. If False, mdPCA is performed on individual-level SNP data alone 
382                or on both individual-level and group-level SNP frequencies when `is_weighted` is True and a `combination` column is provided.
383        """
384        return self.__group_snp_frequencies_only

Retrieve group_snp_frequencies_only.

Returns:

bool: If True, mdPCA is performed exclusively on group-level SNP frequencies, ignoring individual-level data. This applies when is_weighted is set to True and a combination column is provided in the labels_file, meaning individuals are aggregated into groups based on their assigned labels. If False, mdPCA is performed on individual-level SNP data alone or on both individual-level and group-level SNP frequencies when is_weighted is True and a combination column is provided.

save_masks: bool
393    @property
394    def save_masks(self) -> bool:
395        """
396        Retrieve `save_masks`.
397        
398        Returns:
399            **bool:** True if the masked matrices are to be saved in a `.npz` file, or False otherwise.
400        """
401        return self.__save_masks

Retrieve save_masks.

Returns:

bool: True if the masked matrices are to be saved in a .npz file, or False otherwise.

load_masks: bool
410    @property
411    def load_masks(self) -> bool:
412        """
413        Retrieve `load_masks`.
414        
415        Returns:
416            **bool:** 
417                True if the masked matrices are to be loaded from a pre-existing `.npz` file specified 
418                by `masks_file`, or False otherwise.
419        """
420        return self.__load_masks

Retrieve load_masks.

Returns:

bool: True if the masked matrices are to be loaded from a pre-existing .npz file specified by masks_file, or False otherwise.

masks_file: Union[str, pathlib._local.Path]
429    @property
430    def masks_file(self) -> Union[str, pathlib.Path]:
431        """
432        Retrieve `masks_file`.
433        
434        Returns:
435            **str or pathlib.Path:** Path to the `.npz` file used for saving/loading masked matrices.
436        """
437        return self.__masks_file

Retrieve masks_file.

Returns:

str or pathlib.Path: Path to the .npz file used for saving/loading masked matrices.

output_file: Union[str, pathlib._local.Path]
446    @property
447    def output_file(self) -> Union[str, pathlib.Path]:
448        """
449        Retrieve `output_file`.
450        
451        Returns:
452            **str or pathlib.Path:** Path to the output `.tsv` file where mdPCA results are saved.
453        """
454        return self.__output_file

Retrieve output_file.

Returns:

str or pathlib.Path: Path to the output .tsv file where mdPCA results are saved.

covariance_matrix_file: Optional[str]
463    @property
464    def covariance_matrix_file(self) -> Optional[str]:
465        """
466        Retrieve `covariance_matrix_file`.
467        
468        Returns:
469            **str:** Path to save the covariance matrix file in `.npy` format.
470        """
471        return self.__covariance_matrix_file

Retrieve covariance_matrix_file.

Returns:

str: Path to save the covariance matrix file in .npy format.

n_components: int
480    @property
481    def n_components(self) -> int:
482        """
483        Retrieve `n_components`.
484        
485        Returns:
486            **int:** The number of principal components.
487        """
488        return self.__n_components

Retrieve n_components.

Returns:

int: The number of principal components.

rsid_or_chrompos: int
497    @property
498    def rsid_or_chrompos(self) -> int:
499        """
500        Retrieve `rsid_or_chrompos`.
501        
502        Returns:
503            **int:** Format indicator for SNP IDs in the SNP data. Use 1 for `rsID` format or 2 for `chromosome_position`.
504        """
505        return self.__rsid_or_chrompos

Retrieve rsid_or_chrompos.

Returns:

int: Format indicator for SNP IDs in the SNP data. Use 1 for rsID format or 2 for chromosome_position.

percent_vals_masked: float
514    @property
515    def percent_vals_masked(self) -> float:
516        """
517        Retrieve `percent_vals_masked`.
518        
519        Returns:
520            **float:** 
521                Percentage of values in the covariance matrix to be masked and then imputed. Only applicable if `method` is 
522                `'cov_matrix_imputation'` or `'cov_matrix_imputation_ils'`.
523        """
524        return self.__percent_vals_masked

Retrieve percent_vals_masked.

Returns:

float: Percentage of values in the covariance matrix to be masked and then imputed. Only applicable if method is 'cov_matrix_imputation' or 'cov_matrix_imputation_ils'.

X_new_: Optional[numpy.ndarray]
533    @property
534    def X_new_(self) -> Optional[np.ndarray]:
535        """
536        Retrieve `X_new_`.
537
538        Returns:
539            **array of shape (n_samples, n_components):** 
540                The transformed SNP data projected onto the `n_components` principal components.
541                n_haplotypes_ is the number of haplotypes, potentially reduced if filtering is applied 
542                (`min_percent_snps > 0`). For diploid individuals without filtering, the shape is 
543                `(n_samples * 2, n_components)`.
544        """
545        return self.__X_new_

Retrieve X_new_.

Returns:

array of shape (n_samples, n_components): The transformed SNP data projected onto the n_components principal components. n_haplotypes_ is the number of haplotypes, potentially reduced if filtering is applied (min_percent_snps > 0). For diploid individuals without filtering, the shape is (n_samples * 2, n_components).

haplotypes_: Optional[List[str]]
554    @property
555    def haplotypes_(self) -> Optional[List[str]]:
556        """
557        Retrieve `haplotypes_`.
558
559        Returns:
560            list of str:
561                A list of unique haplotype identifiers.
562        """
563        if isinstance(self.__haplotypes_, np.ndarray):
564            return self.__haplotypes_.ravel().tolist()  # Flatten and convert NumPy array to a list
565        elif isinstance(self.__haplotypes_, list):
566            if len(self.__haplotypes_) == 1 and isinstance(self.__haplotypes_[0], np.ndarray):
567                return self.__haplotypes_[0].ravel().tolist()  # Handle list containing a single array
568            return self.__haplotypes_  # Already a flat list
569        elif self.__haplotypes_ is None:
570            return None  # If no haplotypes are set
571        else:
572            raise TypeError("`haplotypes_` must be a list or a NumPy array.")

Retrieve haplotypes_.

Returns:

list of str: A list of unique haplotype identifiers.

samples_: Optional[List[str]]
589    @property
590    def samples_(self) -> Optional[List[str]]:
591        """
592        Retrieve `samples_`.
593
594        Returns:
595            list of str:
596                A list of sample identifiers based on `haplotypes_` and `average_strands`.
597        """
598        haplotypes = self.haplotypes_
599        if haplotypes is None:
600            return None
601        if self.__average_strands:
602            return haplotypes
603        else:
604            return [x[:-2] for x in haplotypes]

Retrieve samples_.

Returns:

list of str: A list of sample identifiers based on haplotypes_ and average_strands.

variants_id_: Optional[numpy.ndarray]
606    @property
607    def variants_id_(self) -> Optional[np.ndarray]:
608        """
609        Retrieve `variants_id_`.
610
611        Returns:
612            **array of shape (n_snp,):** 
613                An array containing unique identifiers (IDs) for each SNP,
614                potentially reduced if there are SNPs not present in the `laiobj`.
615                The format will depend on `rsid_or_chrompos`.
616        """
617        return self.__variants_id_

Retrieve variants_id_.

Returns:

array of shape (n_snp,): An array containing unique identifiers (IDs) for each SNP, potentially reduced if there are SNPs not present in the laiobj. The format will depend on rsid_or_chrompos.

n_haplotypes: Optional[int]
626    @property
627    def n_haplotypes(self) -> Optional[int]:
628        """
629        Retrieve `n_haplotypes`.
630
631        Returns:
632            **int:**
633                The total number of haplotypes, potentially reduced if filtering is applied 
634                (`min_percent_snps > 0`).
635        """
636        return len(self.haplotypes_)

Retrieve n_haplotypes.

Returns:

int: The total number of haplotypes, potentially reduced if filtering is applied (min_percent_snps > 0).

n_samples: Optional[int]
638    @property
639    def n_samples(self) -> Optional[int]:
640        """
641        Retrieve `n_samples`.
642
643        Returns:
644            **int:**
645                The total number of samples, potentially reduced if filtering is applied 
646                (`min_percent_snps > 0`).
647        """
648        return len(np.unique(self.samples_))

Retrieve n_samples.

Returns:

int: The total number of samples, potentially reduced if filtering is applied (min_percent_snps > 0).

def copy(self) -> mdPCA:
650    def copy(self) -> 'mdPCA':
651        """
652        Create and return a copy of `self`.
653
654        Returns:
655            **mdPCA:** 
656                A new instance of the current object.
657        """
658        return copy.copy(self)

Create and return a copy of self.

Returns:

mdPCA: A new instance of the current object.

def fit_transform( self, snpobj: Optional[snputils.snp.genobj.SNPObject] = None, laiobj: Optional[snputils.ancestry.genobj.LocalAncestryObject] = None, labels_file: Optional[str] = None, ancestry: Optional[str] = None, average_strands: Optional[bool] = None) -> numpy.ndarray:
1006    def fit_transform(
1007            self,
1008            snpobj: Optional['SNPObject'] = None, 
1009            laiobj: Optional['LocalAncestryObject'] = None,
1010            labels_file: Optional[str] = None,
1011            ancestry: Optional[str] = None,
1012            average_strands: Optional[bool] = None
1013        ) -> np.ndarray:
1014        """
1015        Fit the model to the SNP data stored in the provided `snpobj` and apply the dimensionality reduction on the same SNP data.
1016        
1017        This method starts by loading or updating SNP and ancestry data. Then, it manages missing values by applying 
1018        masks based on ancestry, either by loading a pre-existing mask or generating new ones. After processing these 
1019        masks to produce an incomplete SNP data matrix, it applies the chosen mdPCA method to reduce dimensionality 
1020        while handling missing data as specified.
1021
1022        Args:
1023            snpobj (SNPObject, optional): 
1024                A SNPObject instance.
1025            laiobj (LAIObject, optional): 
1026                A LAIObject instance.
1027            labels_file (str, optional): 
1028                Path to the labels file in .tsv format. The first column, `indID`, contains the individual identifiers, and the second 
1029                column, `label`, specifies the groups for all individuals. If `is_weighted=True`, a `weight` column with individual 
1030                weights is required. Optionally, `combination` and `combination_weight` columns can specify sets of individuals to be 
1031                combined into groups, with respective weights.
1032            ancestry (str, optional): 
1033                Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0.
1034            average_strands (bool, optional): 
1035                True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
1036                If None, defaults to `self.average_strands`.
1037
1038        Returns:
1039            **array of shape (n_samples, n_components):** 
1040                The transformed SNP data projected onto the `n_components` principal components, stored in `self.X_new_`.
1041        """
1042        if snpobj is None:
1043            snpobj = self.snpobj
1044        if laiobj is None:
1045            laiobj = self.laiobj
1046        if labels_file is None:
1047            labels_file = self.labels_file
1048        if ancestry is None:
1049            ancestry = self.ancestry
1050        if average_strands is None:
1051            average_strands = self.average_strands
1052        
1053        if self.load_masks:
1054            # Load precomputed ancestry-based masked genotype matrix, SNP identifiers, haplotype identifiers, and weights
1055            mask, variants_id, haplotypes, _, weights = self._load_mask_file()
1056        else:
1057            # Process genotype data with optional ancestry-based masking and return the corresponding SNP and individual identifiers
1058            mask, variants_id, haplotypes = process_calldata_gt(
1059                self.snpobj,
1060                self.laiobj,
1061                self.ancestry,
1062                self.average_strands,
1063                self.force_nan_incomplete_strands,
1064                self.is_masked,
1065                self.rsid_or_chrompos
1066            )
1067
1068            # Process individual genomic labels and weights, aligning them with a masked genotype matrix by 
1069            # filtering out low-coverage individuals, reordering data to match the matrix structure, and 
1070            # handling group-based adjustments
1071            mask, haplotypes, _, weights = process_labels_weights(
1072                self.labels_file,
1073                mask,
1074                variants_id,
1075                haplotypes,
1076                self.average_strands,
1077                self.ancestry,
1078                self.min_percent_snps,
1079                self.group_snp_frequencies_only,
1080                self.groups_to_remove,
1081                self.is_weighted,
1082                self.save_masks,
1083                self.masks_file
1084            )
1085
1086        # Call run_cov_matrix with the specified method
1087        self.X_new_ = self._run_cov_matrix(
1088            mask[self.ancestry].T,
1089            weights
1090        )
1091
1092        self.haplotypes_ = haplotypes
1093        self.variants_id_ = variants_id
1094
1095        return self.X_new_

Fit the model to the SNP data stored in the provided snpobj and apply the dimensionality reduction on the same SNP data.

This method starts by loading or updating SNP and ancestry data. Then, it manages missing values by applying masks based on ancestry, either by loading a pre-existing mask or generating new ones. After processing these masks to produce an incomplete SNP data matrix, it applies the chosen mdPCA method to reduce dimensionality while handling missing data as specified.

Arguments:
  • snpobj (SNPObject, optional): A SNPObject instance.
  • laiobj (LAIObject, optional): A LAIObject instance.
  • labels_file (str, optional): Path to the labels file in .tsv format. The first column, indID, contains the individual identifiers, and the second column, label, specifies the groups for all individuals. If is_weighted=True, a weight column with individual weights is required. Optionally, combination and combination_weight columns can specify sets of individuals to be combined into groups, with respective weights.
  • ancestry (str, optional): Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0.
  • average_strands (bool, optional): True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. If None, defaults to self.average_strands.
Returns:

array of shape (n_samples, n_components): The transformed SNP data projected onto the n_components principal components, stored in self.X_new_.