snputils.processing.mdpca

   1import pathlib
   2import os
   3import time
   4import gc
   5import logging
   6import logging.config
   7import numpy as np
   8import copy
   9from typing import Optional, Dict, List, Union
  10from sklearn.decomposition import TruncatedSVD
  11
  12from snputils.snp.genobj.snpobj import SNPObject
  13from snputils.ancestry.genobj.local import LocalAncestryObject
  14from ._utils.gen_tools import array_process, process_labels_weights
  15from ._utils.iterative_svd import IterativeSVD
  16
  17
  18class mdPCA:
  19    """
  20    A class for missing data principal component analysis (mdPCA).
  21
  22    This class supports both separate and averaged strand processing for SNP data. If the `snpobj`, 
  23    `laiobj`, `labels_file`, and `ancestry` parameters are all provided during instantiation, 
  24    the `fit_transform` method will be automatically called, applying the specified mdPCA method to transform 
  25    the data upon instantiation.
  26    """
  27    def __init__(
  28        self,
  29        method: str = 'weighted_cov_pca',
  30        snpobj: Optional['SNPObject'] = None,
  31        laiobj: Optional['LocalAncestryObject'] = None,
  32        labels_file: Optional[str] = None,
  33        ancestry: Optional[str] = None,
  34        is_masked: bool = True,
  35        prob_thresh: float = 0,
  36        average_strands: bool = False,
  37        is_weighted: bool = False,
  38        groups_to_remove: Dict[int, List[str]] = {},
  39        min_percent_snps: float = 4,
  40        save_masks: bool = False,
  41        load_masks: bool = False,
  42        masks_file: Union[str, pathlib.Path] = 'masks.npz',
  43        output_file: Union[str, pathlib.Path] = 'output.tsv',
  44        covariance_matrix_file: Optional[str] = None,
  45        n_components: int = 2,
  46        rsid_or_chrompos: int = 2,
  47        percent_vals_masked: float = 0
  48    ):
  49        """
  50        Args:
  51            method (str, default='weighted_cov_pca'): 
  52                The PCA method to use for dimensionality reduction. Options include:
  53                - `'weighted_cov_pca'`: 
  54                    Simple covariance-based PCA, weighted by sample strengths.
  55                - `'regularized_optimization_ils'`: 
  56                    Regularized optimization followed by iterative, weighted (via the strengths) least squares projection of 
  57                    missing samples using the original covariance matrix (considering only relevant elements not missing in 
  58                    the original covariance matrix for those samples).
  59                - `'cov_matrix_imputation'`: 
  60                    Eigen-decomposition of the covariance matrix after first imputing the covariance matrix missing values 
  61                    using the Iterative SVD imputation method.
  62                - `'cov_matrix_imputation_ils'`: 
  63                    The method of 'cov_matrix_imputation', but where afterwards missing samples are re-projected onto the space 
  64                    given by 'cov_matrix_imputation' using the same iterative method on the original covariance matrix just 
  65                    as done in 'regularized_optimization_ils'.
  66                - `'nonmissing_pca_ils'`: 
  67                    The method of 'weighted_cov_pca' on the non-missing samples, followed by the projection of missing samples onto 
  68                    the space given by 'weighted_cov_pca' using the same iterative method on the original covariance matrix just as 
  69                    done in 'regularized_optimization_ils'.
  70            snpobj (SNPObject, optional): 
  71                A SNPObject instance.
  72            laiobj (LAIObject, optional): 
  73                A LAIObject instance.
  74            labels_file (str, optional): 
  75                Path to the labels file in .tsv format. The first column, `indID`, contains the individual identifiers, and the second 
  76                column, `label`, specifies the groups for all individuals. If `is_weighted=True`, a `weight` column with individual 
  77                weights is required. Optionally, `combination` and `combination_weight` columns can specify sets of individuals to be 
  78                combined into groups, with respective weights.
  79            ancestry (str, optional): 
  80                Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at `0`.
  81            is_masked (bool, default=True): 
  82                True if an ancestry file is passed for ancestry-specific masking, or False otherwise.
  83            prob_thresh (float, default=0): 
  84                Minimum probability threshold for a SNP to belong to an ancestry.
  85            average_strands (bool, default=False): 
  86                True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
  87            is_weighted (bool, default=False): 
  88                True if weights are provided in the labels file, or False otherwise.
  89            groups_to_remove (dict of int to list of str, default={}): 
  90                Dictionary specifying groups to exclude from analysis. Keys are array numbers, and values are 
  91                lists of groups to remove for each array.
  92                Example: `{1: ['group1', 'group2'], 2: [], 3: ['group3']}`.
  93            min_percent_snps (float, default=4): 
  94                Minimum percentage of SNPs that must be known for an individual to be included in the analysis.
  95                All individuals with fewer percent of unmasked SNPs than this threshold will be excluded.
  96            save_masks (bool, default=False): 
  97                True if the masked matrices are to be saved in a `.npz` file, or False otherwise.
  98            load_masks (bool, default=False): 
  99                True if the masked matrices are to be loaded from a pre-existing `.npz` file specified by `masks_file`, or False otherwise.
 100            masks_file (str or pathlib.Path, default='masks.npz'): 
 101                Path to the `.npz` file used for saving/loading masked matrices.
 102            output_file (str or pathlib.Path, default='output.tsv'): 
 103                Path to the output `.tsv` file where mdPCA results are saved.
 104            covariance_matrix_file (str, optional): 
 105                Path to save the covariance matrix file in `.npy` format. If None, the covariance matrix is not saved. Default is None.
 106            n_components (int, default=2): 
 107                The number of principal components.
 108            rsid_or_chrompos (int, default=2): 
 109                Format indicator for SNP IDs in the SNP data. Use 1 for `rsID` format or 2 for `chromosome_position`.
 110            percent_vals_masked (float, default=0): 
 111                Percentage of values in the covariance matrix to be masked and then imputed. Only applicable if `method` is 
 112                `'cov_matrix_imputation'` or `'cov_matrix_imputation_ils'`.
 113        """
 114        self.__snpobj = snpobj
 115        self.__laiobj = laiobj
 116        self.__labels_file = labels_file
 117        self.__ancestry = ancestry
 118        self.__method = method
 119        self.__is_masked = is_masked
 120        self.__prob_thresh = prob_thresh
 121        self.__average_strands = average_strands
 122        self.__is_weighted = is_weighted
 123        self.__groups_to_remove = groups_to_remove
 124        self.__min_percent_snps = min_percent_snps
 125        self.__save_masks = save_masks
 126        self.__load_masks = load_masks
 127        self.__masks_file = masks_file
 128        self.__output_file = output_file
 129        self.__covariance_matrix_file = covariance_matrix_file
 130        self.__n_components = n_components
 131        self.__rsid_or_chrompos = rsid_or_chrompos
 132        self.__percent_vals_masked = percent_vals_masked
 133        self.__X_new_ = None  # Store transformed SNP data
 134        self.__haplotypes_ = None  # Store haplotypes after filtering if min_percent_snps > 0
 135        self.__samples_ = None  # Store samples after filtering if min_percent_snps > 0
 136
 137        # Fit and transform if a `snpobj`, `laiobj`, `labels_file`, and `ancestry` are provided
 138        if self.snpobj is not None and self.laiobj is not None and self.labels_file is not None and self.ancestry is not None:
 139            self.fit_transform(snpobj, laiobj, labels_file, ancestry)
 140
 141    def __getitem__(self, key):
 142        """
 143        To access an attribute of the class using the square bracket notation,
 144        similar to a dictionary.
 145        """
 146        try:
 147            return getattr(self, key)
 148        except AttributeError:
 149            raise KeyError(f'Invalid key: {key}')
 150
 151    def __setitem__(self, key, value):
 152        """
 153        To set an attribute of the class using the square bracket notation,
 154        similar to a dictionary.
 155        """
 156        try:
 157            setattr(self, key, value)
 158        except AttributeError:
 159            raise KeyError(f'Invalid key: {key}')
 160
 161    @property
 162    def method(self) -> str:
 163        """
 164        Retrieve `method`.
 165        
 166        Returns:
 167            **str:** The PCA method to use for dimensionality reduction.
 168        """
 169        return self.__method
 170
 171    @method.setter
 172    def method(self, x: str) -> None:
 173        """
 174        Update `method`.
 175        """
 176        self.__method = x
 177
 178    @property
 179    def snpobj(self) -> Optional['SNPObject']:
 180        """
 181        Retrieve `snpobj`.
 182        
 183        Returns:
 184            **SNPObject:** A SNPObject instance.
 185        """
 186        return self.__snpobj
 187
 188    @snpobj.setter
 189    def snpobj(self, x: 'SNPObject') -> None:
 190        """
 191        Update `snpobj`.
 192        """
 193        self.__snpobj = x
 194
 195    @property
 196    def laiobj(self) -> Optional['LocalAncestryObject']:
 197        """
 198        Retrieve `laiobj`.
 199        
 200        Returns:
 201            **LocalAncestryObject:** A LocalAncestryObject instance.
 202        """
 203        return self.__laiobj
 204
 205    @laiobj.setter
 206    def laiobj(self, x: 'LocalAncestryObject') -> None:
 207        """
 208        Update `laiobj`.
 209        """
 210        self.__laiobj = x
 211
 212    @property
 213    def labels_file(self) -> Optional[str]:
 214        """
 215        Retrieve `labels_file`.
 216        
 217        Returns:
 218            **str:** Path to the labels file in `.tsv` format.
 219        """
 220        return self.__labels_file
 221
 222    @labels_file.setter
 223    def labels_file(self, x: str) -> None:
 224        """
 225        Update `labels_file`.
 226        """
 227        self.__labels_file = x
 228
 229    @property
 230    def ancestry(self) -> Optional[str]:
 231        """
 232        Retrieve `ancestry`.
 233        
 234        Returns:
 235            **str:** Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0.
 236        """
 237        return self.__ancestry
 238
 239    @ancestry.setter
 240    def ancestry(self, x: str) -> None:
 241        """
 242        Update `ancestry`.
 243        """
 244        self.__ancestry = x
 245
 246    @property
 247    def is_masked(self) -> bool:
 248        """
 249        Retrieve `is_masked`.
 250        
 251        Returns:
 252            **bool:** True if an ancestry file is passed for ancestry-specific masking, or False otherwise.
 253        """
 254        return self.__is_masked
 255
 256    @is_masked.setter
 257    def is_masked(self, x: bool) -> None:
 258        """
 259        Update `is_masked`.
 260        """
 261        self.__is_masked = x
 262
 263    @property
 264    def prob_thresh(self) -> float:
 265        """
 266        Retrieve `prob_thresh`.
 267        
 268        Returns:
 269            **float:** Minimum probability threshold for a SNP to belong to an ancestry.
 270        """
 271        return self.__prob_thresh
 272
 273    @prob_thresh.setter
 274    def prob_thresh(self, x: float) -> None:
 275        """Update `prob_thresh`.
 276        """
 277        self.__prob_thresh = x
 278
 279    @property
 280    def average_strands(self) -> bool:
 281        """
 282        Retrieve `average_strands`.
 283        
 284        Returns:
 285            **bool:** True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
 286        """
 287        return self.__average_strands
 288
 289    @average_strands.setter
 290    def average_strands(self, x: bool) -> None:
 291        """
 292        Update `average_strands`.
 293        """
 294        self.__average_strands = x
 295
 296    @property
 297    def is_weighted(self) -> bool:
 298        """
 299        Retrieve `is_weighted`.
 300        
 301        Returns:
 302            **bool:** True if weights are provided in the labels file, or False otherwise.
 303        """
 304        return self.__is_weighted
 305
 306    @is_weighted.setter
 307    def is_weighted(self, x: bool) -> None:
 308        """
 309        Update `is_weighted`.
 310        """
 311        self.__is_weighted = x
 312
 313    @property
 314    def groups_to_remove(self) -> Dict[int, List[str]]:
 315        """
 316        Retrieve `groups_to_remove`.
 317        
 318        Returns:
 319            **dict of int to list of str:** Dictionary specifying groups to exclude from analysis. Keys are array numbers, and values are 
 320                lists of groups to remove for each array. Example: `{1: ['group1', 'group2'], 2: [], 3: ['group3']}`.
 321        """
 322        return self.__groups_to_remove
 323
 324    @groups_to_remove.setter
 325    def groups_to_remove(self, x: Dict[int, List[str]]) -> None:
 326        """
 327        Update `groups_to_remove`.
 328        """
 329        self.__groups_to_remove = x
 330
 331    @property
 332    def min_percent_snps(self) -> float:
 333        """
 334        Retrieve `min_percent_snps`.
 335        
 336        Returns:
 337            **float:** 
 338                Minimum percentage of SNPs that must be known for an individual to be included in the analysis.
 339                All individuals with fewer percent of unmasked SNPs than this threshold will be excluded.
 340        """
 341        return self.__min_percent_snps
 342
 343    @min_percent_snps.setter
 344    def min_percent_snps(self, x: float) -> None:
 345        """
 346        Update `min_percent_snps`.
 347        """
 348        self.__min_percent_snps = x
 349
 350    @property
 351    def save_masks(self) -> bool:
 352        """
 353        Retrieve `save_masks`.
 354        
 355        Returns:
 356            **bool:** True if the masked matrices are to be saved in a `.npz` file, or False otherwise.
 357        """
 358        return self.__save_masks
 359
 360    @save_masks.setter
 361    def save_masks(self, x: bool) -> None:
 362        """
 363        Update `save_masks`.
 364        """
 365        self.__save_masks = x
 366
 367    @property
 368    def load_masks(self) -> bool:
 369        """
 370        Retrieve `load_masks`.
 371        
 372        Returns:
 373            **bool:** 
 374                True if the masked matrices are to be loaded from a pre-existing `.npz` file specified 
 375                by `masks_file`, or False otherwise.
 376        """
 377        return self.__load_masks
 378
 379    @load_masks.setter
 380    def load_masks(self, x: bool) -> None:
 381        """
 382        Update `load_masks`.
 383        """
 384        self.__load_masks = x
 385
 386    @property
 387    def masks_file(self) -> Union[str, pathlib.Path]:
 388        """
 389        Retrieve `masks_file`.
 390        
 391        Returns:
 392            **str or pathlib.Path:** Path to the `.npz` file used for saving/loading masked matrices.
 393        """
 394        return self.__masks_file
 395
 396    @masks_file.setter
 397    def masks_file(self, x: Union[str, pathlib.Path]) -> None:
 398        """
 399        Update `masks_file`.
 400        """
 401        self.__masks_file = x
 402
 403    @property
 404    def output_file(self) -> Union[str, pathlib.Path]:
 405        """
 406        Retrieve `output_file`.
 407        
 408        Returns:
 409            **str or pathlib.Path:** Path to the output `.tsv` file where mdPCA results are saved.
 410        """
 411        return self.__output_file
 412
 413    @output_file.setter
 414    def output_file(self, x: Union[str, pathlib.Path]) -> None:
 415        """
 416        Update `output_file`.
 417        """
 418        self.__output_file = x
 419
 420    @property
 421    def covariance_matrix_file(self) -> Optional[str]:
 422        """
 423        Retrieve `covariance_matrix_file`.
 424        
 425        Returns:
 426            **str:** Path to save the covariance matrix file in `.npy` format.
 427        """
 428        return self.__covariance_matrix_file
 429    
 430    @covariance_matrix_file.setter
 431    def covariance_matrix_file(self, x: Optional[str]) -> None:
 432        """
 433        Update `covariance_matrix_file`.
 434        """
 435        self.__covariance_matrix_file = x
 436
 437    @property
 438    def n_components(self) -> int:
 439        """
 440        Retrieve `n_components`.
 441        
 442        Returns:
 443            **int:** The number of principal components.
 444        """
 445        return self.__n_components
 446
 447    @n_components.setter
 448    def n_components(self, x: int) -> None:
 449        """
 450        Update `n_components`.
 451        """
 452        self.__n_components = x
 453
 454    @property
 455    def rsid_or_chrompos(self) -> int:
 456        """
 457        Retrieve `rsid_or_chrompos`.
 458        
 459        Returns:
 460            **int:** Format indicator for SNP IDs in the SNP data. Use 1 for `rsID` format or 2 for `chromosome_position`.
 461        """
 462        return self.__rsid_or_chrompos
 463
 464    @rsid_or_chrompos.setter
 465    def rsid_or_chrompos(self, x: int) -> None:
 466        """
 467        Update `rsid_or_chrompos`.
 468        """
 469        self.__rsid_or_chrompos = x
 470
 471    @property
 472    def percent_vals_masked(self) -> float:
 473        """
 474        Retrieve `percent_vals_masked`.
 475        
 476        Returns:
 477            **float:** 
 478                Percentage of values in the covariance matrix to be masked and then imputed. Only applicable if `method` is 
 479                `'cov_matrix_imputation'` or `'cov_matrix_imputation_ils'`.
 480        """
 481        return self.__percent_vals_masked
 482
 483    @percent_vals_masked.setter
 484    def percent_vals_masked(self, x: float) -> None:
 485        """
 486        Update `percent_vals_masked`.
 487        """
 488        self.__percent_vals_masked = x
 489
 490    @property
 491    def X_new_(self) -> Optional[np.ndarray]:
 492        """
 493        Retrieve `X_new_`.
 494
 495        Returns:
 496            **array of shape (n_samples, n_components):** 
 497                The transformed SNP data projected onto the `n_components` principal components.
 498                n_haplotypes_ is the number of haplotypes, potentially reduced if filtering is applied 
 499                (`min_percent_snps > 0`). For diploid individuals without filtering, the shape is 
 500                `(n_samples * 2, n_components)`.
 501        """
 502        return self.__X_new_
 503
 504    @X_new_.setter
 505    def X_new_(self, x: np.ndarray) -> None:
 506        """
 507        Update `X_new_`.
 508        """
 509        self.__X_new_ = x
 510
 511    @property
 512    def haplotypes_(self) -> Optional[List[str]]:
 513        """
 514        Retrieve `haplotypes_`.
 515
 516        Returns:
 517            list of str:
 518                A list of unique haplotype identifiers.
 519        """
 520        if isinstance(self.__haplotypes_, np.ndarray):
 521            return self.__haplotypes_.ravel().tolist()  # Flatten and convert NumPy array to a list
 522        elif isinstance(self.__haplotypes_, list):
 523            if len(self.__haplotypes_) == 1 and isinstance(self.__haplotypes_[0], np.ndarray):
 524                return self.__haplotypes_[0].ravel().tolist()  # Handle list containing a single array
 525            return self.__haplotypes_  # Already a flat list
 526        elif self.__haplotypes_ is None:
 527            return None  # If no haplotypes are set
 528        else:
 529            raise TypeError("`haplotypes_` must be a list or a NumPy array.")
 530
 531    @haplotypes_.setter
 532    def haplotypes_(self, x: Union[np.ndarray, List[str]]) -> None:
 533        """
 534        Update `haplotypes_`.
 535        """
 536        if isinstance(x, np.ndarray):
 537            self.__haplotypes_ = x.ravel().tolist()  # Flatten and convert to a list
 538        elif isinstance(x, list):
 539            if len(x) == 1 and isinstance(x[0], np.ndarray):  # Handle list containing a single array
 540                self.__haplotypes_ = x[0].ravel().tolist()
 541            else:
 542                self.__haplotypes_ = x  # Use directly if already a list
 543        else:
 544            raise TypeError("`x` must be a list or a NumPy array.")
 545
 546    @property
 547    def samples_(self) -> Optional[List[str]]:
 548        """
 549        Retrieve `samples_`.
 550
 551        Returns:
 552            list of str:
 553                A list of sample identifiers based on `haplotypes_` and `average_strands`.
 554        """
 555        haplotypes = self.haplotypes_
 556        if haplotypes is None:
 557            return None
 558        if self.__average_strands:
 559            return haplotypes
 560        else:
 561            return [x[:-2] for x in haplotypes]
 562
 563    @property
 564    def n_haplotypes(self) -> Optional[int]:
 565        """
 566        Retrieve `n_haplotypes`.
 567
 568        Returns:
 569            **int:**
 570                The total number of haplotypes, potentially reduced if filtering is applied 
 571                (`min_percent_snps > 0`).
 572        """
 573        return len(self.haplotypes_)
 574
 575    @property
 576    def n_samples(self) -> Optional[int]:
 577        """
 578        Retrieve `n_samples`.
 579
 580        Returns:
 581            **int:**
 582                The total number of samples, potentially reduced if filtering is applied 
 583                (`min_percent_snps > 0`).
 584        """
 585        return len(np.unique(self.samples_))
 586
 587    def copy(self) -> 'mdPCA':
 588        """
 589        Create and return a copy of `self`.
 590
 591        Returns:
 592            **mdPCA:** 
 593                A new instance of the current object.
 594        """
 595        return copy.copy(self)
 596
 597    def _process_masks(self, masks, rs_ID_list, ind_ID_list):
 598        masked_matrix = masks[0][self.ancestry].T
 599        rs_IDs = rs_ID_list[0]
 600        ind_IDs = ind_ID_list[0]
 601        return masked_matrix, rs_IDs, ind_IDs
 602
 603    def _load_mask_file(self):
 604        mask_files = np.load(self.masks_file, allow_pickle=True)
 605        masks = mask_files['masks']
 606        rs_ID_list = mask_files['rs_ID_list']
 607        ind_ID_list = mask_files['ind_ID_list']
 608        labels = mask_files['labels']
 609        weights = mask_files['weights']
 610        return masks, rs_ID_list, ind_ID_list, labels, weights
 611
 612    @staticmethod
 613    def _compute_strength_vector(X):
 614        strength_vector = np.sum(~np.isnan(X), axis=1) / X.shape[1]
 615        return strength_vector
 616
 617    @staticmethod
 618    def _compute_strength_matrix(X):
 619        notmask = (~np.isnan(X)).astype(np.float32)
 620        strength_matrix = np.dot(notmask, notmask.T)
 621        strength_matrix /= X.shape[1]
 622        return strength_matrix
 623
 624    def _cov(self, x):
 625        ddof = 1
 626        x = np.ma.array(x, ndmin=2, copy=True, dtype=np.float32)
 627        xmask = np.ma.getmaskarray(x)
 628        xnotmask = np.logical_not(xmask).astype(np.float32)
 629        fact = np.dot(xnotmask, xnotmask.T) * 1. - ddof
 630        del xnotmask
 631        gc.collect()
 632        result = (np.ma.dot(x, x.T, strict=False) / fact).squeeze()
 633        x = x.data
 634        strength_vec = self._compute_strength_vector(x)
 635        strength_mat = self._compute_strength_matrix(x)
 636        return result.data, strength_vec, strength_mat
 637
 638    @staticmethod
 639    def _demean(S, w):
 640        w_sum = np.matmul(w, S)
 641        w_rowsum = w_sum.reshape((1, S.shape[0]))
 642        w_colsum = w_sum.reshape((S.shape[0], 1))
 643        w_totalsum = np.dot(w_sum, w)
 644        S -= (w_rowsum + w_colsum) - w_totalsum
 645        return S
 646
 647    @staticmethod
 648    def _iterative_svd_matrix_completion(cov):
 649        num_masked = np.isnan(cov).sum()
 650        if num_masked > 0:
 651            start_time = time.time()
 652            start_rank = 1
 653            end_rank = 10
 654            rank = 5
 655            choose_best = True
 656            num_cores = 5
 657            cov_complete = IterativeSVD(start_rank=start_rank, end_rank=end_rank, rank=rank, choose_best=choose_best, num_cores=num_cores).fit_transform(cov)
 658            logging.info("Iterative SVD --- %s seconds ---" % (time.time() - start_time))
 659        else:
 660            cov_complete = cov
 661        return cov_complete
 662
 663    def _weighted_cov_pca(self, X_incomplete, weights, covariance_matrix_file, n_components):
 664        start_time = time.time()
 665        X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete))
 666        S, _, _ = self._cov(X_incomplete)
 667        logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time))
 668        weights_normalized = weights / weights.sum()
 669        S = self._demean(S, weights_normalized)
 670        W = np.diag(weights)
 671        WSW = np.matmul(np.matmul(np.sqrt(W), S), np.sqrt(W))
 672        if covariance_matrix_file:
 673            np.save(covariance_matrix_file, S.data)
 674        svd = TruncatedSVD(n_components, algorithm="arpack")
 675        svd.fit(WSW)
 676        U = np.matmul(svd.components_, np.linalg.inv(np.sqrt(W))).T
 677        D = np.diag(np.sqrt(svd.singular_values_))
 678        T = np.matmul(U, D)
 679        total_var = np.trace(S)
 680        for i in range(n_components):
 681            pc_percentvar = 100 * svd.singular_values_[i] / total_var
 682            logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar)
 683        return T
 684
 685    @staticmethod
 686    def _create_validation_mask(X_incomplete, percent_inds):
 687        np.random.seed(1)
 688        masked_rows = np.isnan(X_incomplete).any(axis=1)
 689        masked_inds = np.flatnonzero(masked_rows)
 690        X_masked = X_incomplete[masked_rows] 
 691        percent_masked = 100 * np.isnan(X_masked).sum() / (X_masked.shape[0] * X_masked.shape[1])
 692        unmasked_rows = ~masked_rows
 693        X_unmasked = X_incomplete[unmasked_rows]
 694        masked_rows = np.random.choice(range(X_unmasked.shape[0]), size=int(X_unmasked.shape[0] * percent_inds / 100), replace=False)
 695        X_masked_rows = X_unmasked[masked_rows,:]
 696        mask = np.zeros(X_masked_rows.shape[0] * X_masked_rows.shape[1], dtype=np.int8)
 697        mask[:int(X_masked_rows.shape[0] * X_masked_rows.shape[1] * percent_masked / 100)] = 1
 698        np.random.shuffle(mask)
 699        mask = mask.astype(bool)
 700        mask = mask.reshape(X_masked_rows.shape)
 701        X_masked_rows[mask] = np.nan
 702        X_unmasked[masked_rows] = X_masked_rows
 703        X_incomplete[unmasked_rows] = X_unmasked
 704        masked_rows_new = np.isnan(X_incomplete).any(axis=1)
 705        masked_inds_new = np.flatnonzero(masked_rows_new)
 706        masked_inds_val = sorted(list(set(masked_inds_new) - set(masked_inds)))
 707        return X_incomplete, masked_inds_val
 708
 709    @staticmethod    
 710    def _create_cov_mask(cov, strength_mat, percent_vals_masked):
 711        cov_flattened = cov.reshape(-1).copy()
 712        num_vals_masked = int(percent_vals_masked * len(cov_flattened) / 100)
 713        pos_masked = strength_mat.reshape(-1).argsort()[:num_vals_masked]
 714        cov_flattened[pos_masked] = np.nan
 715        cov_masked = cov_flattened.reshape(cov.shape)
 716        return cov_masked
 717
 718    def _regularized_optimization_ils(self, X_incomplete, covariance_matrix_file, n_components):
 719        def run_cov_matrix_regularized_optimization_ils(X_incomplete, covariance_matrix_file):
 720            start_time = time.time()
 721            X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete))
 722            has_missing = np.isnan(X_incomplete.data).sum() > 0
 723            S, _, strength_mat = self._cov(X_incomplete)
 724            logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time))
 725            S = self._demean(S, np.ones(S.shape[0]) / S.shape[0])
 726            if has_missing: 
 727                logging.info("Starting matrix completion. This will take a few minutes...")
 728                start_time = time.time()
 729                percent_inds_val = 20 # Percent of unmasked individuals to be masked for cross-validation 
 730                X_incomplete, masked_inds_val = self._create_validation_mask(X_incomplete.data, percent_inds_val) # masked_inds_val is the list of indices of the individuals masked for validation
 731                X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete))
 732                S_prime, _, w_mat_prime = self._cov(X_incomplete)
 733                del X_incomplete
 734                gc.collect()
 735                S_prime = self._demean(S_prime, np.ones(S_prime.shape[0]) / S_prime.shape[0])
 736                S_robust, lam = self.matrix_completion(S, strength_mat, S_prime, w_mat_prime, lams=None, method="NN", 
 737                                                cv_inds=masked_inds_val)
 738                logging.info(f"Covariance Matrix --- %{time.time() - start_time:.2}s seconds ---")
 739            else:
 740                S_robust = S.copy()
 741            if covariance_matrix_file:
 742                np.save(covariance_matrix_file, S.data)
 743                if has_missing:
 744                    base, ext = os.path.splitext(covariance_matrix_file)
 745                    np.save(f"{base}_completed_{lam}{ext}", S_robust.data)
 746            return S, S_robust
 747
 748        def _compute_projection_regularized_optimization_ils(X_incomplete, S, S_robust, n_components):
 749            _, _, strength_mat = self._cov(np.ma.array(X_incomplete, mask=np.isnan(X_incomplete)))
 750            nonmissing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) == 0)
 751            missing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) > 0)
 752            missing_sorted = sorted(missing, key=lambda x: (~np.isnan(X_incomplete[x])).sum(), reverse=True)
 753            svd = TruncatedSVD(n_components, algorithm="arpack")
 754            svd.fit(S_robust)
 755            U = svd.components_.T
 756            U_nonmissing = U[nonmissing]
 757            D_nonmissing = np.diag(np.sqrt(svd.singular_values_))
 758            T = np.zeros((X_incomplete.shape[0], n_components))
 759            T_filled = np.matmul(U_nonmissing, D_nonmissing)
 760            i_filled = nonmissing
 761            T[nonmissing] = T_filled
 762            for i in missing_sorted:
 763                S_i = S[i][i_filled]
 764                W_i = np.diag(strength_mat[i][i_filled])
 765                A = np.matmul(W_i, T_filled)
 766                b = np.matmul(W_i, S_i)
 767                t = np.linalg.lstsq(A, b, rcond=None)[0]
 768                T[i] = t
 769                i_filled = np.append(i_filled, i)
 770                T_filled = np.append(T_filled, [t], axis=0)
 771            total_var = np.trace(S)
 772            for i in range(n_components):
 773                pc_percentvar = 100 * svd.singular_values_[i] / total_var
 774                logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar)
 775            return T
 776        
 777        S, S_robust = run_cov_matrix_regularized_optimization_ils(X_incomplete, covariance_matrix_file)
 778        T = _compute_projection_regularized_optimization_ils(X_incomplete, S, S_robust, n_components)
 779        return T
 780
 781    def _cov_matrix_imputation(self, X_incomplete, percent_vals_masked, covariance_matrix_file, n_components):
 782        start_time = time.time()
 783        X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete))
 784        S, strength_vec, strength_mat = self._cov(X_incomplete)
 785        logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time))
 786        S_masked = self._create_cov_mask(S, strength_mat, percent_vals_masked)
 787        S = self._iterative_svd_matrix_completion(S_masked)
 788        weights = np.ones(len(strength_vec))
 789        weights_normalized = weights / weights.sum()
 790        S = self._demean(S, weights_normalized)
 791        W = np.diag(weights)
 792        WSW = np.matmul(np.matmul(np.sqrt(W), S), np.sqrt(W))
 793        if covariance_matrix_file:
 794            np.save(covariance_matrix_file, S.data)
 795        svd = TruncatedSVD(n_components, algorithm="arpack")
 796        svd.fit(WSW)
 797        U = np.matmul(svd.components_, np.linalg.inv(np.sqrt(W))).T
 798        D = np.diag(np.sqrt(svd.singular_values_))
 799        T = np.matmul(U, D)
 800        total_var = np.trace(S)
 801        for i in range(n_components):
 802            pc_percentvar = 100 * svd.singular_values_[i] / total_var
 803            logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar)
 804        return T
 805
 806    def _cov_matrix_imputation_ils(self, X_incomplete, percent_vals_masked, covariance_matrix_file, n_components):
 807        
 808        def run_cov_matrix_cov_matrix_imputation_ils(X_incomplete, covariance_matrix_file):
 809            start_time = time.time()
 810            X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete))
 811            S, strength_vec, strength_mat = self._cov(X_incomplete)
 812            logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time))
 813            S_masked = self._create_cov_mask(S, strength_mat, percent_vals_masked)
 814            S = self._iterative_svd_matrix_completion(S_masked)
 815            weights = np.ones(len(strength_vec))
 816            weights_normalized = weights / weights.sum()
 817            S = self._demean(S, weights_normalized)
 818            W = np.diag(weights)
 819            if covariance_matrix_file:
 820                np.save(covariance_matrix_file, S.data)
 821            return S, W
 822
 823        def compute_projection_cov_matrix_imputation_ils(X_incomplete, S, W, n_components):
 824            S, _, strength_mat = self._cov(np.ma.array(X_incomplete, mask=np.isnan(X_incomplete)))
 825            S = self._demean(S, np.ones(S.shape[0]) / S.shape[0])
 826            nonmissing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) == 0)
 827            missing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) > 0)
 828            missing_sorted = sorted(missing, key=lambda x: (~np.isnan(X_incomplete[x])).sum(), reverse=True)
 829            svd = TruncatedSVD(n_components, algorithm="arpack")
 830            WSW = np.matmul(np.matmul(np.sqrt(W), S), np.sqrt(W))
 831            svd.fit(WSW)
 832            U = np.matmul(svd.components_, np.linalg.inv(np.sqrt(W))).T
 833            U_nonmissing = U[nonmissing]
 834            D_nonmissing = np.diag(np.sqrt(svd.singular_values_))
 835            T = np.zeros((X_incomplete.shape[0], n_components))
 836            T_filled = np.matmul(U_nonmissing, D_nonmissing)
 837            i_filled = nonmissing
 838            T[nonmissing] = T_filled
 839            for i in missing_sorted:
 840                S_i = S[i][i_filled]
 841                W_i = np.diag(strength_mat[i][i_filled])
 842                A = np.matmul(W_i, T_filled)
 843                b = np.matmul(W_i, S_i)
 844                t = np.linalg.lstsq(A, b, rcond=None)[0]
 845                T[i] = t
 846                i_filled = np.append(i_filled, i)
 847                T_filled = np.append(T_filled, [t], axis=0)
 848            total_var = np.trace(S)
 849            for i in range(n_components):
 850                pc_percentvar = 100 * svd.singular_values_[i] / total_var
 851                logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar)
 852            return T
 853        
 854        S, W = run_cov_matrix_cov_matrix_imputation_ils(X_incomplete, covariance_matrix_file)
 855        T = compute_projection_cov_matrix_imputation_ils(X_incomplete, S, W, n_components)
 856        return T
 857
 858    def _nonmissing_pca_ils(self, X_incomplete, covariance_matrix_file, n_components):
 859    
 860        def run_cov_matrix_nonmissing_pca_ils(X_incomplete, covariance_matrix_file):
 861            start_time = time.time()
 862            nonmissing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) == 0)
 863            X_incomplete_nonmissing = X_incomplete[nonmissing]
 864            X_incomplete_nonmissing = np.ma.array(X_incomplete_nonmissing, mask=np.isnan(X_incomplete_nonmissing))
 865            S_nonmissing, _, _ = self._cov(X_incomplete_nonmissing)
 866            logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time))
 867            S_nonmissing = self._demean(S_nonmissing, np.ones(S_nonmissing.shape[0]) / S_nonmissing.shape[0])    
 868            if covariance_matrix_file:
 869                np.save(covariance_matrix_file, S_nonmissing.data)
 870            return S_nonmissing
 871        
 872        def compute_projection_nonmissing_pca_ils(X_incomplete, S_nonmissing, n_components):
 873            S, _, strength_mat = self._cov(np.ma.array(X_incomplete, mask=np.isnan(X_incomplete)))
 874            S = self._demean(S, np.ones(S.shape[0]) / S.shape[0])
 875            nonmissing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) == 0)
 876            missing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) > 0)
 877            missing_sorted = sorted(missing, key=lambda x: (~np.isnan(X_incomplete[x])).sum(), reverse=True)
 878            svd = TruncatedSVD(n_components, algorithm="arpack")
 879            svd.fit(S_nonmissing)
 880            U_nonmissing = svd.components_.T
 881            D_nonmissing = np.diag(np.sqrt(svd.singular_values_))
 882            T = np.zeros((X_incomplete.shape[0], n_components))
 883            T_filled = np.matmul(U_nonmissing, D_nonmissing)
 884            i_filled = nonmissing
 885            T[nonmissing] = T_filled
 886            for i in missing_sorted:
 887                S_i = S[i][i_filled]
 888                W_i = np.diag(strength_mat[i][i_filled])
 889                A = np.matmul(W_i, T_filled)
 890                b = np.matmul(W_i, S_i)
 891                t = np.linalg.lstsq(A, b, rcond=None)[0]
 892                T[i] = t
 893                i_filled = np.append(i_filled, i)
 894                T_filled = np.append(T_filled, [t], axis=0)
 895            total_var = np.trace(S)
 896            for i in range(n_components):
 897                pc_percentvar = 100 * svd.singular_values_[i] / total_var
 898                logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar)
 899            return T
 900        
 901        S_nonmissing = run_cov_matrix_nonmissing_pca_ils(X_incomplete, covariance_matrix_file)
 902        T = compute_projection_nonmissing_pca_ils(X_incomplete, S_nonmissing, n_components)
 903        return T
 904
 905    def _run_cov_matrix(self, X_incomplete, weights):
 906        """
 907        Runs the specified PCA method on the incomplete data.
 908        """
 909        if self.method == "weighted_cov_pca":
 910            return self._weighted_cov_pca(X_incomplete, weights, self.covariance_matrix_file, self.n_components)
 911        elif self.method == "regularized_optimization_ils":
 912            return self._regularized_optimization_ils(X_incomplete, self.covariance_matrix_file, self.n_components)
 913        elif self.method == "cov_matrix_imputation":
 914            return self._cov_matrix_imputation(X_incomplete, self.percent_vals_masked, self.covariance_matrix_file, self.n_components)
 915        elif self.method == "cov_matrix_imputation_ils":
 916            return self._cov_matrix_imputation_ils(X_incomplete, self.percent_vals_masked, self.covariance_matrix_file, self.n_components)
 917        elif self.method == "nonmissing_pca_ils":
 918            return self._nonmissing_pca_ils(X_incomplete, self.covariance_matrix_file, self.n_components)
 919        else:
 920            raise ValueError(f"Unsupported method: {self.method}")
 921
 922    def fit_transform(
 923            self,
 924            snpobj: Optional['SNPObject'] = None, 
 925            laiobj: Optional['LocalAncestryObject'] = None,
 926            labels_file: Optional[str] = None,
 927            ancestry: Optional[str] = None,
 928            average_strands: Optional[bool] = None
 929        ) -> np.ndarray:
 930        """
 931        Fit the model to the SNP data stored in the provided `snpobj` and apply the dimensionality reduction on the same SNP data.
 932        
 933        This method starts by loading or updating SNP and ancestry data. Then, it manages missing values by applying 
 934        masks based on ancestry, either by loading a pre-existing mask or generating new ones. After processing these 
 935        masks to produce an incomplete SNP data matrix, it applies the chosen mdPCA method to reduce dimensionality 
 936        while handling missing data as specified.
 937
 938        Args:
 939            snpobj (SNPObject, optional): 
 940                A SNPObject instance.
 941            laiobj (LAIObject, optional): 
 942                A LAIObject instance.
 943            labels_file (str, optional): 
 944                Path to the labels file in .tsv format. The first column, `indID`, contains the individual identifiers, and the second 
 945                column, `label`, specifies the groups for all individuals. If `is_weighted=True`, a `weight` column with individual 
 946                weights is required. Optionally, `combination` and `combination_weight` columns can specify sets of individuals to be 
 947                combined into groups, with respective weights.
 948            ancestry (str, optional): 
 949                Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0.
 950            average_strands (bool, optional): 
 951                True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
 952                If None, defaults to `self.average_strands`.
 953
 954        Returns:
 955            **array of shape (n_samples, n_components):** 
 956                The transformed SNP data projected onto the `n_components` principal components, stored in `self.X_new_`.
 957        """
 958        if snpobj is None:
 959            snpobj = self.snpobj
 960        if laiobj is None:
 961            laiobj = self.laiobj
 962        if labels_file is None:
 963            labels_file = self.labels_file
 964        if ancestry is None:
 965            ancestry = self.ancestry
 966        if average_strands is None:
 967            average_strands = self.average_strands
 968        
 969        if self.load_masks:
 970            masks, rs_id_list, ind_id_list, _, weights = self._load_mask_file()
 971        else:
 972            masks, rs_id_list, ind_id_list = array_process(
 973                self.snpobj,
 974                self.laiobj,
 975                self.average_strands,
 976                self.prob_thresh,
 977                self.is_masked,
 978                self.rsid_or_chrompos
 979            )
 980
 981            masks, ind_id_list, _, weights = process_labels_weights(
 982                self.labels_file,
 983                masks,
 984                rs_id_list,
 985                ind_id_list,
 986                self.average_strands,
 987                self.ancestry,
 988                self.min_percent_snps,
 989                self.groups_to_remove,
 990                self.is_weighted,
 991                self.save_masks,
 992                self.masks_file
 993            )
 994
 995        X_incomplete, _, _ = self._process_masks(masks, rs_id_list, ind_id_list)
 996
 997        # Call run_cov_matrix with the specified method
 998        self.X_new_ = self._run_cov_matrix(
 999            X_incomplete,
1000            weights
1001        )
1002
1003        self.haplotypes_ = ind_id_list
1004
1005        return self.X_new_
class mdPCA:
  19class mdPCA:
  20    """
  21    A class for missing data principal component analysis (mdPCA).
  22
  23    This class supports both separate and averaged strand processing for SNP data. If the `snpobj`, 
  24    `laiobj`, `labels_file`, and `ancestry` parameters are all provided during instantiation, 
  25    the `fit_transform` method will be automatically called, applying the specified mdPCA method to transform 
  26    the data upon instantiation.
  27    """
  28    def __init__(
  29        self,
  30        method: str = 'weighted_cov_pca',
  31        snpobj: Optional['SNPObject'] = None,
  32        laiobj: Optional['LocalAncestryObject'] = None,
  33        labels_file: Optional[str] = None,
  34        ancestry: Optional[str] = None,
  35        is_masked: bool = True,
  36        prob_thresh: float = 0,
  37        average_strands: bool = False,
  38        is_weighted: bool = False,
  39        groups_to_remove: Dict[int, List[str]] = {},
  40        min_percent_snps: float = 4,
  41        save_masks: bool = False,
  42        load_masks: bool = False,
  43        masks_file: Union[str, pathlib.Path] = 'masks.npz',
  44        output_file: Union[str, pathlib.Path] = 'output.tsv',
  45        covariance_matrix_file: Optional[str] = None,
  46        n_components: int = 2,
  47        rsid_or_chrompos: int = 2,
  48        percent_vals_masked: float = 0
  49    ):
  50        """
  51        Args:
  52            method (str, default='weighted_cov_pca'): 
  53                The PCA method to use for dimensionality reduction. Options include:
  54                - `'weighted_cov_pca'`: 
  55                    Simple covariance-based PCA, weighted by sample strengths.
  56                - `'regularized_optimization_ils'`: 
  57                    Regularized optimization followed by iterative, weighted (via the strengths) least squares projection of 
  58                    missing samples using the original covariance matrix (considering only relevant elements not missing in 
  59                    the original covariance matrix for those samples).
  60                - `'cov_matrix_imputation'`: 
  61                    Eigen-decomposition of the covariance matrix after first imputing the covariance matrix missing values 
  62                    using the Iterative SVD imputation method.
  63                - `'cov_matrix_imputation_ils'`: 
  64                    The method of 'cov_matrix_imputation', but where afterwards missing samples are re-projected onto the space 
  65                    given by 'cov_matrix_imputation' using the same iterative method on the original covariance matrix just 
  66                    as done in 'regularized_optimization_ils'.
  67                - `'nonmissing_pca_ils'`: 
  68                    The method of 'weighted_cov_pca' on the non-missing samples, followed by the projection of missing samples onto 
  69                    the space given by 'weighted_cov_pca' using the same iterative method on the original covariance matrix just as 
  70                    done in 'regularized_optimization_ils'.
  71            snpobj (SNPObject, optional): 
  72                A SNPObject instance.
  73            laiobj (LAIObject, optional): 
  74                A LAIObject instance.
  75            labels_file (str, optional): 
  76                Path to the labels file in .tsv format. The first column, `indID`, contains the individual identifiers, and the second 
  77                column, `label`, specifies the groups for all individuals. If `is_weighted=True`, a `weight` column with individual 
  78                weights is required. Optionally, `combination` and `combination_weight` columns can specify sets of individuals to be 
  79                combined into groups, with respective weights.
  80            ancestry (str, optional): 
  81                Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at `0`.
  82            is_masked (bool, default=True): 
  83                True if an ancestry file is passed for ancestry-specific masking, or False otherwise.
  84            prob_thresh (float, default=0): 
  85                Minimum probability threshold for a SNP to belong to an ancestry.
  86            average_strands (bool, default=False): 
  87                True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
  88            is_weighted (bool, default=False): 
  89                True if weights are provided in the labels file, or False otherwise.
  90            groups_to_remove (dict of int to list of str, default={}): 
  91                Dictionary specifying groups to exclude from analysis. Keys are array numbers, and values are 
  92                lists of groups to remove for each array.
  93                Example: `{1: ['group1', 'group2'], 2: [], 3: ['group3']}`.
  94            min_percent_snps (float, default=4): 
  95                Minimum percentage of SNPs that must be known for an individual to be included in the analysis.
  96                All individuals with fewer percent of unmasked SNPs than this threshold will be excluded.
  97            save_masks (bool, default=False): 
  98                True if the masked matrices are to be saved in a `.npz` file, or False otherwise.
  99            load_masks (bool, default=False): 
 100                True if the masked matrices are to be loaded from a pre-existing `.npz` file specified by `masks_file`, or False otherwise.
 101            masks_file (str or pathlib.Path, default='masks.npz'): 
 102                Path to the `.npz` file used for saving/loading masked matrices.
 103            output_file (str or pathlib.Path, default='output.tsv'): 
 104                Path to the output `.tsv` file where mdPCA results are saved.
 105            covariance_matrix_file (str, optional): 
 106                Path to save the covariance matrix file in `.npy` format. If None, the covariance matrix is not saved. Default is None.
 107            n_components (int, default=2): 
 108                The number of principal components.
 109            rsid_or_chrompos (int, default=2): 
 110                Format indicator for SNP IDs in the SNP data. Use 1 for `rsID` format or 2 for `chromosome_position`.
 111            percent_vals_masked (float, default=0): 
 112                Percentage of values in the covariance matrix to be masked and then imputed. Only applicable if `method` is 
 113                `'cov_matrix_imputation'` or `'cov_matrix_imputation_ils'`.
 114        """
 115        self.__snpobj = snpobj
 116        self.__laiobj = laiobj
 117        self.__labels_file = labels_file
 118        self.__ancestry = ancestry
 119        self.__method = method
 120        self.__is_masked = is_masked
 121        self.__prob_thresh = prob_thresh
 122        self.__average_strands = average_strands
 123        self.__is_weighted = is_weighted
 124        self.__groups_to_remove = groups_to_remove
 125        self.__min_percent_snps = min_percent_snps
 126        self.__save_masks = save_masks
 127        self.__load_masks = load_masks
 128        self.__masks_file = masks_file
 129        self.__output_file = output_file
 130        self.__covariance_matrix_file = covariance_matrix_file
 131        self.__n_components = n_components
 132        self.__rsid_or_chrompos = rsid_or_chrompos
 133        self.__percent_vals_masked = percent_vals_masked
 134        self.__X_new_ = None  # Store transformed SNP data
 135        self.__haplotypes_ = None  # Store haplotypes after filtering if min_percent_snps > 0
 136        self.__samples_ = None  # Store samples after filtering if min_percent_snps > 0
 137
 138        # Fit and transform if a `snpobj`, `laiobj`, `labels_file`, and `ancestry` are provided
 139        if self.snpobj is not None and self.laiobj is not None and self.labels_file is not None and self.ancestry is not None:
 140            self.fit_transform(snpobj, laiobj, labels_file, ancestry)
 141
 142    def __getitem__(self, key):
 143        """
 144        To access an attribute of the class using the square bracket notation,
 145        similar to a dictionary.
 146        """
 147        try:
 148            return getattr(self, key)
 149        except AttributeError:
 150            raise KeyError(f'Invalid key: {key}')
 151
 152    def __setitem__(self, key, value):
 153        """
 154        To set an attribute of the class using the square bracket notation,
 155        similar to a dictionary.
 156        """
 157        try:
 158            setattr(self, key, value)
 159        except AttributeError:
 160            raise KeyError(f'Invalid key: {key}')
 161
 162    @property
 163    def method(self) -> str:
 164        """
 165        Retrieve `method`.
 166        
 167        Returns:
 168            **str:** The PCA method to use for dimensionality reduction.
 169        """
 170        return self.__method
 171
 172    @method.setter
 173    def method(self, x: str) -> None:
 174        """
 175        Update `method`.
 176        """
 177        self.__method = x
 178
 179    @property
 180    def snpobj(self) -> Optional['SNPObject']:
 181        """
 182        Retrieve `snpobj`.
 183        
 184        Returns:
 185            **SNPObject:** A SNPObject instance.
 186        """
 187        return self.__snpobj
 188
 189    @snpobj.setter
 190    def snpobj(self, x: 'SNPObject') -> None:
 191        """
 192        Update `snpobj`.
 193        """
 194        self.__snpobj = x
 195
 196    @property
 197    def laiobj(self) -> Optional['LocalAncestryObject']:
 198        """
 199        Retrieve `laiobj`.
 200        
 201        Returns:
 202            **LocalAncestryObject:** A LocalAncestryObject instance.
 203        """
 204        return self.__laiobj
 205
 206    @laiobj.setter
 207    def laiobj(self, x: 'LocalAncestryObject') -> None:
 208        """
 209        Update `laiobj`.
 210        """
 211        self.__laiobj = x
 212
 213    @property
 214    def labels_file(self) -> Optional[str]:
 215        """
 216        Retrieve `labels_file`.
 217        
 218        Returns:
 219            **str:** Path to the labels file in `.tsv` format.
 220        """
 221        return self.__labels_file
 222
 223    @labels_file.setter
 224    def labels_file(self, x: str) -> None:
 225        """
 226        Update `labels_file`.
 227        """
 228        self.__labels_file = x
 229
 230    @property
 231    def ancestry(self) -> Optional[str]:
 232        """
 233        Retrieve `ancestry`.
 234        
 235        Returns:
 236            **str:** Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0.
 237        """
 238        return self.__ancestry
 239
 240    @ancestry.setter
 241    def ancestry(self, x: str) -> None:
 242        """
 243        Update `ancestry`.
 244        """
 245        self.__ancestry = x
 246
 247    @property
 248    def is_masked(self) -> bool:
 249        """
 250        Retrieve `is_masked`.
 251        
 252        Returns:
 253            **bool:** True if an ancestry file is passed for ancestry-specific masking, or False otherwise.
 254        """
 255        return self.__is_masked
 256
 257    @is_masked.setter
 258    def is_masked(self, x: bool) -> None:
 259        """
 260        Update `is_masked`.
 261        """
 262        self.__is_masked = x
 263
 264    @property
 265    def prob_thresh(self) -> float:
 266        """
 267        Retrieve `prob_thresh`.
 268        
 269        Returns:
 270            **float:** Minimum probability threshold for a SNP to belong to an ancestry.
 271        """
 272        return self.__prob_thresh
 273
 274    @prob_thresh.setter
 275    def prob_thresh(self, x: float) -> None:
 276        """Update `prob_thresh`.
 277        """
 278        self.__prob_thresh = x
 279
 280    @property
 281    def average_strands(self) -> bool:
 282        """
 283        Retrieve `average_strands`.
 284        
 285        Returns:
 286            **bool:** True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
 287        """
 288        return self.__average_strands
 289
 290    @average_strands.setter
 291    def average_strands(self, x: bool) -> None:
 292        """
 293        Update `average_strands`.
 294        """
 295        self.__average_strands = x
 296
 297    @property
 298    def is_weighted(self) -> bool:
 299        """
 300        Retrieve `is_weighted`.
 301        
 302        Returns:
 303            **bool:** True if weights are provided in the labels file, or False otherwise.
 304        """
 305        return self.__is_weighted
 306
 307    @is_weighted.setter
 308    def is_weighted(self, x: bool) -> None:
 309        """
 310        Update `is_weighted`.
 311        """
 312        self.__is_weighted = x
 313
 314    @property
 315    def groups_to_remove(self) -> Dict[int, List[str]]:
 316        """
 317        Retrieve `groups_to_remove`.
 318        
 319        Returns:
 320            **dict of int to list of str:** Dictionary specifying groups to exclude from analysis. Keys are array numbers, and values are 
 321                lists of groups to remove for each array. Example: `{1: ['group1', 'group2'], 2: [], 3: ['group3']}`.
 322        """
 323        return self.__groups_to_remove
 324
 325    @groups_to_remove.setter
 326    def groups_to_remove(self, x: Dict[int, List[str]]) -> None:
 327        """
 328        Update `groups_to_remove`.
 329        """
 330        self.__groups_to_remove = x
 331
 332    @property
 333    def min_percent_snps(self) -> float:
 334        """
 335        Retrieve `min_percent_snps`.
 336        
 337        Returns:
 338            **float:** 
 339                Minimum percentage of SNPs that must be known for an individual to be included in the analysis.
 340                All individuals with fewer percent of unmasked SNPs than this threshold will be excluded.
 341        """
 342        return self.__min_percent_snps
 343
 344    @min_percent_snps.setter
 345    def min_percent_snps(self, x: float) -> None:
 346        """
 347        Update `min_percent_snps`.
 348        """
 349        self.__min_percent_snps = x
 350
 351    @property
 352    def save_masks(self) -> bool:
 353        """
 354        Retrieve `save_masks`.
 355        
 356        Returns:
 357            **bool:** True if the masked matrices are to be saved in a `.npz` file, or False otherwise.
 358        """
 359        return self.__save_masks
 360
 361    @save_masks.setter
 362    def save_masks(self, x: bool) -> None:
 363        """
 364        Update `save_masks`.
 365        """
 366        self.__save_masks = x
 367
 368    @property
 369    def load_masks(self) -> bool:
 370        """
 371        Retrieve `load_masks`.
 372        
 373        Returns:
 374            **bool:** 
 375                True if the masked matrices are to be loaded from a pre-existing `.npz` file specified 
 376                by `masks_file`, or False otherwise.
 377        """
 378        return self.__load_masks
 379
 380    @load_masks.setter
 381    def load_masks(self, x: bool) -> None:
 382        """
 383        Update `load_masks`.
 384        """
 385        self.__load_masks = x
 386
 387    @property
 388    def masks_file(self) -> Union[str, pathlib.Path]:
 389        """
 390        Retrieve `masks_file`.
 391        
 392        Returns:
 393            **str or pathlib.Path:** Path to the `.npz` file used for saving/loading masked matrices.
 394        """
 395        return self.__masks_file
 396
 397    @masks_file.setter
 398    def masks_file(self, x: Union[str, pathlib.Path]) -> None:
 399        """
 400        Update `masks_file`.
 401        """
 402        self.__masks_file = x
 403
 404    @property
 405    def output_file(self) -> Union[str, pathlib.Path]:
 406        """
 407        Retrieve `output_file`.
 408        
 409        Returns:
 410            **str or pathlib.Path:** Path to the output `.tsv` file where mdPCA results are saved.
 411        """
 412        return self.__output_file
 413
 414    @output_file.setter
 415    def output_file(self, x: Union[str, pathlib.Path]) -> None:
 416        """
 417        Update `output_file`.
 418        """
 419        self.__output_file = x
 420
 421    @property
 422    def covariance_matrix_file(self) -> Optional[str]:
 423        """
 424        Retrieve `covariance_matrix_file`.
 425        
 426        Returns:
 427            **str:** Path to save the covariance matrix file in `.npy` format.
 428        """
 429        return self.__covariance_matrix_file
 430    
 431    @covariance_matrix_file.setter
 432    def covariance_matrix_file(self, x: Optional[str]) -> None:
 433        """
 434        Update `covariance_matrix_file`.
 435        """
 436        self.__covariance_matrix_file = x
 437
 438    @property
 439    def n_components(self) -> int:
 440        """
 441        Retrieve `n_components`.
 442        
 443        Returns:
 444            **int:** The number of principal components.
 445        """
 446        return self.__n_components
 447
 448    @n_components.setter
 449    def n_components(self, x: int) -> None:
 450        """
 451        Update `n_components`.
 452        """
 453        self.__n_components = x
 454
 455    @property
 456    def rsid_or_chrompos(self) -> int:
 457        """
 458        Retrieve `rsid_or_chrompos`.
 459        
 460        Returns:
 461            **int:** Format indicator for SNP IDs in the SNP data. Use 1 for `rsID` format or 2 for `chromosome_position`.
 462        """
 463        return self.__rsid_or_chrompos
 464
 465    @rsid_or_chrompos.setter
 466    def rsid_or_chrompos(self, x: int) -> None:
 467        """
 468        Update `rsid_or_chrompos`.
 469        """
 470        self.__rsid_or_chrompos = x
 471
 472    @property
 473    def percent_vals_masked(self) -> float:
 474        """
 475        Retrieve `percent_vals_masked`.
 476        
 477        Returns:
 478            **float:** 
 479                Percentage of values in the covariance matrix to be masked and then imputed. Only applicable if `method` is 
 480                `'cov_matrix_imputation'` or `'cov_matrix_imputation_ils'`.
 481        """
 482        return self.__percent_vals_masked
 483
 484    @percent_vals_masked.setter
 485    def percent_vals_masked(self, x: float) -> None:
 486        """
 487        Update `percent_vals_masked`.
 488        """
 489        self.__percent_vals_masked = x
 490
 491    @property
 492    def X_new_(self) -> Optional[np.ndarray]:
 493        """
 494        Retrieve `X_new_`.
 495
 496        Returns:
 497            **array of shape (n_samples, n_components):** 
 498                The transformed SNP data projected onto the `n_components` principal components.
 499                n_haplotypes_ is the number of haplotypes, potentially reduced if filtering is applied 
 500                (`min_percent_snps > 0`). For diploid individuals without filtering, the shape is 
 501                `(n_samples * 2, n_components)`.
 502        """
 503        return self.__X_new_
 504
 505    @X_new_.setter
 506    def X_new_(self, x: np.ndarray) -> None:
 507        """
 508        Update `X_new_`.
 509        """
 510        self.__X_new_ = x
 511
 512    @property
 513    def haplotypes_(self) -> Optional[List[str]]:
 514        """
 515        Retrieve `haplotypes_`.
 516
 517        Returns:
 518            list of str:
 519                A list of unique haplotype identifiers.
 520        """
 521        if isinstance(self.__haplotypes_, np.ndarray):
 522            return self.__haplotypes_.ravel().tolist()  # Flatten and convert NumPy array to a list
 523        elif isinstance(self.__haplotypes_, list):
 524            if len(self.__haplotypes_) == 1 and isinstance(self.__haplotypes_[0], np.ndarray):
 525                return self.__haplotypes_[0].ravel().tolist()  # Handle list containing a single array
 526            return self.__haplotypes_  # Already a flat list
 527        elif self.__haplotypes_ is None:
 528            return None  # If no haplotypes are set
 529        else:
 530            raise TypeError("`haplotypes_` must be a list or a NumPy array.")
 531
 532    @haplotypes_.setter
 533    def haplotypes_(self, x: Union[np.ndarray, List[str]]) -> None:
 534        """
 535        Update `haplotypes_`.
 536        """
 537        if isinstance(x, np.ndarray):
 538            self.__haplotypes_ = x.ravel().tolist()  # Flatten and convert to a list
 539        elif isinstance(x, list):
 540            if len(x) == 1 and isinstance(x[0], np.ndarray):  # Handle list containing a single array
 541                self.__haplotypes_ = x[0].ravel().tolist()
 542            else:
 543                self.__haplotypes_ = x  # Use directly if already a list
 544        else:
 545            raise TypeError("`x` must be a list or a NumPy array.")
 546
 547    @property
 548    def samples_(self) -> Optional[List[str]]:
 549        """
 550        Retrieve `samples_`.
 551
 552        Returns:
 553            list of str:
 554                A list of sample identifiers based on `haplotypes_` and `average_strands`.
 555        """
 556        haplotypes = self.haplotypes_
 557        if haplotypes is None:
 558            return None
 559        if self.__average_strands:
 560            return haplotypes
 561        else:
 562            return [x[:-2] for x in haplotypes]
 563
 564    @property
 565    def n_haplotypes(self) -> Optional[int]:
 566        """
 567        Retrieve `n_haplotypes`.
 568
 569        Returns:
 570            **int:**
 571                The total number of haplotypes, potentially reduced if filtering is applied 
 572                (`min_percent_snps > 0`).
 573        """
 574        return len(self.haplotypes_)
 575
 576    @property
 577    def n_samples(self) -> Optional[int]:
 578        """
 579        Retrieve `n_samples`.
 580
 581        Returns:
 582            **int:**
 583                The total number of samples, potentially reduced if filtering is applied 
 584                (`min_percent_snps > 0`).
 585        """
 586        return len(np.unique(self.samples_))
 587
 588    def copy(self) -> 'mdPCA':
 589        """
 590        Create and return a copy of `self`.
 591
 592        Returns:
 593            **mdPCA:** 
 594                A new instance of the current object.
 595        """
 596        return copy.copy(self)
 597
 598    def _process_masks(self, masks, rs_ID_list, ind_ID_list):
 599        masked_matrix = masks[0][self.ancestry].T
 600        rs_IDs = rs_ID_list[0]
 601        ind_IDs = ind_ID_list[0]
 602        return masked_matrix, rs_IDs, ind_IDs
 603
 604    def _load_mask_file(self):
 605        mask_files = np.load(self.masks_file, allow_pickle=True)
 606        masks = mask_files['masks']
 607        rs_ID_list = mask_files['rs_ID_list']
 608        ind_ID_list = mask_files['ind_ID_list']
 609        labels = mask_files['labels']
 610        weights = mask_files['weights']
 611        return masks, rs_ID_list, ind_ID_list, labels, weights
 612
 613    @staticmethod
 614    def _compute_strength_vector(X):
 615        strength_vector = np.sum(~np.isnan(X), axis=1) / X.shape[1]
 616        return strength_vector
 617
 618    @staticmethod
 619    def _compute_strength_matrix(X):
 620        notmask = (~np.isnan(X)).astype(np.float32)
 621        strength_matrix = np.dot(notmask, notmask.T)
 622        strength_matrix /= X.shape[1]
 623        return strength_matrix
 624
 625    def _cov(self, x):
 626        ddof = 1
 627        x = np.ma.array(x, ndmin=2, copy=True, dtype=np.float32)
 628        xmask = np.ma.getmaskarray(x)
 629        xnotmask = np.logical_not(xmask).astype(np.float32)
 630        fact = np.dot(xnotmask, xnotmask.T) * 1. - ddof
 631        del xnotmask
 632        gc.collect()
 633        result = (np.ma.dot(x, x.T, strict=False) / fact).squeeze()
 634        x = x.data
 635        strength_vec = self._compute_strength_vector(x)
 636        strength_mat = self._compute_strength_matrix(x)
 637        return result.data, strength_vec, strength_mat
 638
 639    @staticmethod
 640    def _demean(S, w):
 641        w_sum = np.matmul(w, S)
 642        w_rowsum = w_sum.reshape((1, S.shape[0]))
 643        w_colsum = w_sum.reshape((S.shape[0], 1))
 644        w_totalsum = np.dot(w_sum, w)
 645        S -= (w_rowsum + w_colsum) - w_totalsum
 646        return S
 647
 648    @staticmethod
 649    def _iterative_svd_matrix_completion(cov):
 650        num_masked = np.isnan(cov).sum()
 651        if num_masked > 0:
 652            start_time = time.time()
 653            start_rank = 1
 654            end_rank = 10
 655            rank = 5
 656            choose_best = True
 657            num_cores = 5
 658            cov_complete = IterativeSVD(start_rank=start_rank, end_rank=end_rank, rank=rank, choose_best=choose_best, num_cores=num_cores).fit_transform(cov)
 659            logging.info("Iterative SVD --- %s seconds ---" % (time.time() - start_time))
 660        else:
 661            cov_complete = cov
 662        return cov_complete
 663
 664    def _weighted_cov_pca(self, X_incomplete, weights, covariance_matrix_file, n_components):
 665        start_time = time.time()
 666        X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete))
 667        S, _, _ = self._cov(X_incomplete)
 668        logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time))
 669        weights_normalized = weights / weights.sum()
 670        S = self._demean(S, weights_normalized)
 671        W = np.diag(weights)
 672        WSW = np.matmul(np.matmul(np.sqrt(W), S), np.sqrt(W))
 673        if covariance_matrix_file:
 674            np.save(covariance_matrix_file, S.data)
 675        svd = TruncatedSVD(n_components, algorithm="arpack")
 676        svd.fit(WSW)
 677        U = np.matmul(svd.components_, np.linalg.inv(np.sqrt(W))).T
 678        D = np.diag(np.sqrt(svd.singular_values_))
 679        T = np.matmul(U, D)
 680        total_var = np.trace(S)
 681        for i in range(n_components):
 682            pc_percentvar = 100 * svd.singular_values_[i] / total_var
 683            logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar)
 684        return T
 685
 686    @staticmethod
 687    def _create_validation_mask(X_incomplete, percent_inds):
 688        np.random.seed(1)
 689        masked_rows = np.isnan(X_incomplete).any(axis=1)
 690        masked_inds = np.flatnonzero(masked_rows)
 691        X_masked = X_incomplete[masked_rows] 
 692        percent_masked = 100 * np.isnan(X_masked).sum() / (X_masked.shape[0] * X_masked.shape[1])
 693        unmasked_rows = ~masked_rows
 694        X_unmasked = X_incomplete[unmasked_rows]
 695        masked_rows = np.random.choice(range(X_unmasked.shape[0]), size=int(X_unmasked.shape[0] * percent_inds / 100), replace=False)
 696        X_masked_rows = X_unmasked[masked_rows,:]
 697        mask = np.zeros(X_masked_rows.shape[0] * X_masked_rows.shape[1], dtype=np.int8)
 698        mask[:int(X_masked_rows.shape[0] * X_masked_rows.shape[1] * percent_masked / 100)] = 1
 699        np.random.shuffle(mask)
 700        mask = mask.astype(bool)
 701        mask = mask.reshape(X_masked_rows.shape)
 702        X_masked_rows[mask] = np.nan
 703        X_unmasked[masked_rows] = X_masked_rows
 704        X_incomplete[unmasked_rows] = X_unmasked
 705        masked_rows_new = np.isnan(X_incomplete).any(axis=1)
 706        masked_inds_new = np.flatnonzero(masked_rows_new)
 707        masked_inds_val = sorted(list(set(masked_inds_new) - set(masked_inds)))
 708        return X_incomplete, masked_inds_val
 709
 710    @staticmethod    
 711    def _create_cov_mask(cov, strength_mat, percent_vals_masked):
 712        cov_flattened = cov.reshape(-1).copy()
 713        num_vals_masked = int(percent_vals_masked * len(cov_flattened) / 100)
 714        pos_masked = strength_mat.reshape(-1).argsort()[:num_vals_masked]
 715        cov_flattened[pos_masked] = np.nan
 716        cov_masked = cov_flattened.reshape(cov.shape)
 717        return cov_masked
 718
 719    def _regularized_optimization_ils(self, X_incomplete, covariance_matrix_file, n_components):
 720        def run_cov_matrix_regularized_optimization_ils(X_incomplete, covariance_matrix_file):
 721            start_time = time.time()
 722            X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete))
 723            has_missing = np.isnan(X_incomplete.data).sum() > 0
 724            S, _, strength_mat = self._cov(X_incomplete)
 725            logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time))
 726            S = self._demean(S, np.ones(S.shape[0]) / S.shape[0])
 727            if has_missing: 
 728                logging.info("Starting matrix completion. This will take a few minutes...")
 729                start_time = time.time()
 730                percent_inds_val = 20 # Percent of unmasked individuals to be masked for cross-validation 
 731                X_incomplete, masked_inds_val = self._create_validation_mask(X_incomplete.data, percent_inds_val) # masked_inds_val is the list of indices of the individuals masked for validation
 732                X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete))
 733                S_prime, _, w_mat_prime = self._cov(X_incomplete)
 734                del X_incomplete
 735                gc.collect()
 736                S_prime = self._demean(S_prime, np.ones(S_prime.shape[0]) / S_prime.shape[0])
 737                S_robust, lam = self.matrix_completion(S, strength_mat, S_prime, w_mat_prime, lams=None, method="NN", 
 738                                                cv_inds=masked_inds_val)
 739                logging.info(f"Covariance Matrix --- %{time.time() - start_time:.2}s seconds ---")
 740            else:
 741                S_robust = S.copy()
 742            if covariance_matrix_file:
 743                np.save(covariance_matrix_file, S.data)
 744                if has_missing:
 745                    base, ext = os.path.splitext(covariance_matrix_file)
 746                    np.save(f"{base}_completed_{lam}{ext}", S_robust.data)
 747            return S, S_robust
 748
 749        def _compute_projection_regularized_optimization_ils(X_incomplete, S, S_robust, n_components):
 750            _, _, strength_mat = self._cov(np.ma.array(X_incomplete, mask=np.isnan(X_incomplete)))
 751            nonmissing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) == 0)
 752            missing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) > 0)
 753            missing_sorted = sorted(missing, key=lambda x: (~np.isnan(X_incomplete[x])).sum(), reverse=True)
 754            svd = TruncatedSVD(n_components, algorithm="arpack")
 755            svd.fit(S_robust)
 756            U = svd.components_.T
 757            U_nonmissing = U[nonmissing]
 758            D_nonmissing = np.diag(np.sqrt(svd.singular_values_))
 759            T = np.zeros((X_incomplete.shape[0], n_components))
 760            T_filled = np.matmul(U_nonmissing, D_nonmissing)
 761            i_filled = nonmissing
 762            T[nonmissing] = T_filled
 763            for i in missing_sorted:
 764                S_i = S[i][i_filled]
 765                W_i = np.diag(strength_mat[i][i_filled])
 766                A = np.matmul(W_i, T_filled)
 767                b = np.matmul(W_i, S_i)
 768                t = np.linalg.lstsq(A, b, rcond=None)[0]
 769                T[i] = t
 770                i_filled = np.append(i_filled, i)
 771                T_filled = np.append(T_filled, [t], axis=0)
 772            total_var = np.trace(S)
 773            for i in range(n_components):
 774                pc_percentvar = 100 * svd.singular_values_[i] / total_var
 775                logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar)
 776            return T
 777        
 778        S, S_robust = run_cov_matrix_regularized_optimization_ils(X_incomplete, covariance_matrix_file)
 779        T = _compute_projection_regularized_optimization_ils(X_incomplete, S, S_robust, n_components)
 780        return T
 781
 782    def _cov_matrix_imputation(self, X_incomplete, percent_vals_masked, covariance_matrix_file, n_components):
 783        start_time = time.time()
 784        X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete))
 785        S, strength_vec, strength_mat = self._cov(X_incomplete)
 786        logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time))
 787        S_masked = self._create_cov_mask(S, strength_mat, percent_vals_masked)
 788        S = self._iterative_svd_matrix_completion(S_masked)
 789        weights = np.ones(len(strength_vec))
 790        weights_normalized = weights / weights.sum()
 791        S = self._demean(S, weights_normalized)
 792        W = np.diag(weights)
 793        WSW = np.matmul(np.matmul(np.sqrt(W), S), np.sqrt(W))
 794        if covariance_matrix_file:
 795            np.save(covariance_matrix_file, S.data)
 796        svd = TruncatedSVD(n_components, algorithm="arpack")
 797        svd.fit(WSW)
 798        U = np.matmul(svd.components_, np.linalg.inv(np.sqrt(W))).T
 799        D = np.diag(np.sqrt(svd.singular_values_))
 800        T = np.matmul(U, D)
 801        total_var = np.trace(S)
 802        for i in range(n_components):
 803            pc_percentvar = 100 * svd.singular_values_[i] / total_var
 804            logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar)
 805        return T
 806
 807    def _cov_matrix_imputation_ils(self, X_incomplete, percent_vals_masked, covariance_matrix_file, n_components):
 808        
 809        def run_cov_matrix_cov_matrix_imputation_ils(X_incomplete, covariance_matrix_file):
 810            start_time = time.time()
 811            X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete))
 812            S, strength_vec, strength_mat = self._cov(X_incomplete)
 813            logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time))
 814            S_masked = self._create_cov_mask(S, strength_mat, percent_vals_masked)
 815            S = self._iterative_svd_matrix_completion(S_masked)
 816            weights = np.ones(len(strength_vec))
 817            weights_normalized = weights / weights.sum()
 818            S = self._demean(S, weights_normalized)
 819            W = np.diag(weights)
 820            if covariance_matrix_file:
 821                np.save(covariance_matrix_file, S.data)
 822            return S, W
 823
 824        def compute_projection_cov_matrix_imputation_ils(X_incomplete, S, W, n_components):
 825            S, _, strength_mat = self._cov(np.ma.array(X_incomplete, mask=np.isnan(X_incomplete)))
 826            S = self._demean(S, np.ones(S.shape[0]) / S.shape[0])
 827            nonmissing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) == 0)
 828            missing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) > 0)
 829            missing_sorted = sorted(missing, key=lambda x: (~np.isnan(X_incomplete[x])).sum(), reverse=True)
 830            svd = TruncatedSVD(n_components, algorithm="arpack")
 831            WSW = np.matmul(np.matmul(np.sqrt(W), S), np.sqrt(W))
 832            svd.fit(WSW)
 833            U = np.matmul(svd.components_, np.linalg.inv(np.sqrt(W))).T
 834            U_nonmissing = U[nonmissing]
 835            D_nonmissing = np.diag(np.sqrt(svd.singular_values_))
 836            T = np.zeros((X_incomplete.shape[0], n_components))
 837            T_filled = np.matmul(U_nonmissing, D_nonmissing)
 838            i_filled = nonmissing
 839            T[nonmissing] = T_filled
 840            for i in missing_sorted:
 841                S_i = S[i][i_filled]
 842                W_i = np.diag(strength_mat[i][i_filled])
 843                A = np.matmul(W_i, T_filled)
 844                b = np.matmul(W_i, S_i)
 845                t = np.linalg.lstsq(A, b, rcond=None)[0]
 846                T[i] = t
 847                i_filled = np.append(i_filled, i)
 848                T_filled = np.append(T_filled, [t], axis=0)
 849            total_var = np.trace(S)
 850            for i in range(n_components):
 851                pc_percentvar = 100 * svd.singular_values_[i] / total_var
 852                logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar)
 853            return T
 854        
 855        S, W = run_cov_matrix_cov_matrix_imputation_ils(X_incomplete, covariance_matrix_file)
 856        T = compute_projection_cov_matrix_imputation_ils(X_incomplete, S, W, n_components)
 857        return T
 858
 859    def _nonmissing_pca_ils(self, X_incomplete, covariance_matrix_file, n_components):
 860    
 861        def run_cov_matrix_nonmissing_pca_ils(X_incomplete, covariance_matrix_file):
 862            start_time = time.time()
 863            nonmissing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) == 0)
 864            X_incomplete_nonmissing = X_incomplete[nonmissing]
 865            X_incomplete_nonmissing = np.ma.array(X_incomplete_nonmissing, mask=np.isnan(X_incomplete_nonmissing))
 866            S_nonmissing, _, _ = self._cov(X_incomplete_nonmissing)
 867            logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time))
 868            S_nonmissing = self._demean(S_nonmissing, np.ones(S_nonmissing.shape[0]) / S_nonmissing.shape[0])    
 869            if covariance_matrix_file:
 870                np.save(covariance_matrix_file, S_nonmissing.data)
 871            return S_nonmissing
 872        
 873        def compute_projection_nonmissing_pca_ils(X_incomplete, S_nonmissing, n_components):
 874            S, _, strength_mat = self._cov(np.ma.array(X_incomplete, mask=np.isnan(X_incomplete)))
 875            S = self._demean(S, np.ones(S.shape[0]) / S.shape[0])
 876            nonmissing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) == 0)
 877            missing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) > 0)
 878            missing_sorted = sorted(missing, key=lambda x: (~np.isnan(X_incomplete[x])).sum(), reverse=True)
 879            svd = TruncatedSVD(n_components, algorithm="arpack")
 880            svd.fit(S_nonmissing)
 881            U_nonmissing = svd.components_.T
 882            D_nonmissing = np.diag(np.sqrt(svd.singular_values_))
 883            T = np.zeros((X_incomplete.shape[0], n_components))
 884            T_filled = np.matmul(U_nonmissing, D_nonmissing)
 885            i_filled = nonmissing
 886            T[nonmissing] = T_filled
 887            for i in missing_sorted:
 888                S_i = S[i][i_filled]
 889                W_i = np.diag(strength_mat[i][i_filled])
 890                A = np.matmul(W_i, T_filled)
 891                b = np.matmul(W_i, S_i)
 892                t = np.linalg.lstsq(A, b, rcond=None)[0]
 893                T[i] = t
 894                i_filled = np.append(i_filled, i)
 895                T_filled = np.append(T_filled, [t], axis=0)
 896            total_var = np.trace(S)
 897            for i in range(n_components):
 898                pc_percentvar = 100 * svd.singular_values_[i] / total_var
 899                logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar)
 900            return T
 901        
 902        S_nonmissing = run_cov_matrix_nonmissing_pca_ils(X_incomplete, covariance_matrix_file)
 903        T = compute_projection_nonmissing_pca_ils(X_incomplete, S_nonmissing, n_components)
 904        return T
 905
 906    def _run_cov_matrix(self, X_incomplete, weights):
 907        """
 908        Runs the specified PCA method on the incomplete data.
 909        """
 910        if self.method == "weighted_cov_pca":
 911            return self._weighted_cov_pca(X_incomplete, weights, self.covariance_matrix_file, self.n_components)
 912        elif self.method == "regularized_optimization_ils":
 913            return self._regularized_optimization_ils(X_incomplete, self.covariance_matrix_file, self.n_components)
 914        elif self.method == "cov_matrix_imputation":
 915            return self._cov_matrix_imputation(X_incomplete, self.percent_vals_masked, self.covariance_matrix_file, self.n_components)
 916        elif self.method == "cov_matrix_imputation_ils":
 917            return self._cov_matrix_imputation_ils(X_incomplete, self.percent_vals_masked, self.covariance_matrix_file, self.n_components)
 918        elif self.method == "nonmissing_pca_ils":
 919            return self._nonmissing_pca_ils(X_incomplete, self.covariance_matrix_file, self.n_components)
 920        else:
 921            raise ValueError(f"Unsupported method: {self.method}")
 922
 923    def fit_transform(
 924            self,
 925            snpobj: Optional['SNPObject'] = None, 
 926            laiobj: Optional['LocalAncestryObject'] = None,
 927            labels_file: Optional[str] = None,
 928            ancestry: Optional[str] = None,
 929            average_strands: Optional[bool] = None
 930        ) -> np.ndarray:
 931        """
 932        Fit the model to the SNP data stored in the provided `snpobj` and apply the dimensionality reduction on the same SNP data.
 933        
 934        This method starts by loading or updating SNP and ancestry data. Then, it manages missing values by applying 
 935        masks based on ancestry, either by loading a pre-existing mask or generating new ones. After processing these 
 936        masks to produce an incomplete SNP data matrix, it applies the chosen mdPCA method to reduce dimensionality 
 937        while handling missing data as specified.
 938
 939        Args:
 940            snpobj (SNPObject, optional): 
 941                A SNPObject instance.
 942            laiobj (LAIObject, optional): 
 943                A LAIObject instance.
 944            labels_file (str, optional): 
 945                Path to the labels file in .tsv format. The first column, `indID`, contains the individual identifiers, and the second 
 946                column, `label`, specifies the groups for all individuals. If `is_weighted=True`, a `weight` column with individual 
 947                weights is required. Optionally, `combination` and `combination_weight` columns can specify sets of individuals to be 
 948                combined into groups, with respective weights.
 949            ancestry (str, optional): 
 950                Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0.
 951            average_strands (bool, optional): 
 952                True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
 953                If None, defaults to `self.average_strands`.
 954
 955        Returns:
 956            **array of shape (n_samples, n_components):** 
 957                The transformed SNP data projected onto the `n_components` principal components, stored in `self.X_new_`.
 958        """
 959        if snpobj is None:
 960            snpobj = self.snpobj
 961        if laiobj is None:
 962            laiobj = self.laiobj
 963        if labels_file is None:
 964            labels_file = self.labels_file
 965        if ancestry is None:
 966            ancestry = self.ancestry
 967        if average_strands is None:
 968            average_strands = self.average_strands
 969        
 970        if self.load_masks:
 971            masks, rs_id_list, ind_id_list, _, weights = self._load_mask_file()
 972        else:
 973            masks, rs_id_list, ind_id_list = array_process(
 974                self.snpobj,
 975                self.laiobj,
 976                self.average_strands,
 977                self.prob_thresh,
 978                self.is_masked,
 979                self.rsid_or_chrompos
 980            )
 981
 982            masks, ind_id_list, _, weights = process_labels_weights(
 983                self.labels_file,
 984                masks,
 985                rs_id_list,
 986                ind_id_list,
 987                self.average_strands,
 988                self.ancestry,
 989                self.min_percent_snps,
 990                self.groups_to_remove,
 991                self.is_weighted,
 992                self.save_masks,
 993                self.masks_file
 994            )
 995
 996        X_incomplete, _, _ = self._process_masks(masks, rs_id_list, ind_id_list)
 997
 998        # Call run_cov_matrix with the specified method
 999        self.X_new_ = self._run_cov_matrix(
1000            X_incomplete,
1001            weights
1002        )
1003
1004        self.haplotypes_ = ind_id_list
1005
1006        return self.X_new_

A class for missing data principal component analysis (mdPCA).

This class supports both separate and averaged strand processing for SNP data. If the snpobj, laiobj, labels_file, and ancestry parameters are all provided during instantiation, the fit_transform method will be automatically called, applying the specified mdPCA method to transform the data upon instantiation.

mdPCA( method: str = 'weighted_cov_pca', snpobj: Optional[snputils.snp.genobj.SNPObject] = None, laiobj: Optional[snputils.ancestry.genobj.LocalAncestryObject] = None, labels_file: Optional[str] = None, ancestry: Optional[str] = None, is_masked: bool = True, prob_thresh: float = 0, average_strands: bool = False, is_weighted: bool = False, groups_to_remove: Dict[int, List[str]] = {}, min_percent_snps: float = 4, save_masks: bool = False, load_masks: bool = False, masks_file: Union[str, pathlib._local.Path] = 'masks.npz', output_file: Union[str, pathlib._local.Path] = 'output.tsv', covariance_matrix_file: Optional[str] = None, n_components: int = 2, rsid_or_chrompos: int = 2, percent_vals_masked: float = 0)
 28    def __init__(
 29        self,
 30        method: str = 'weighted_cov_pca',
 31        snpobj: Optional['SNPObject'] = None,
 32        laiobj: Optional['LocalAncestryObject'] = None,
 33        labels_file: Optional[str] = None,
 34        ancestry: Optional[str] = None,
 35        is_masked: bool = True,
 36        prob_thresh: float = 0,
 37        average_strands: bool = False,
 38        is_weighted: bool = False,
 39        groups_to_remove: Dict[int, List[str]] = {},
 40        min_percent_snps: float = 4,
 41        save_masks: bool = False,
 42        load_masks: bool = False,
 43        masks_file: Union[str, pathlib.Path] = 'masks.npz',
 44        output_file: Union[str, pathlib.Path] = 'output.tsv',
 45        covariance_matrix_file: Optional[str] = None,
 46        n_components: int = 2,
 47        rsid_or_chrompos: int = 2,
 48        percent_vals_masked: float = 0
 49    ):
 50        """
 51        Args:
 52            method (str, default='weighted_cov_pca'): 
 53                The PCA method to use for dimensionality reduction. Options include:
 54                - `'weighted_cov_pca'`: 
 55                    Simple covariance-based PCA, weighted by sample strengths.
 56                - `'regularized_optimization_ils'`: 
 57                    Regularized optimization followed by iterative, weighted (via the strengths) least squares projection of 
 58                    missing samples using the original covariance matrix (considering only relevant elements not missing in 
 59                    the original covariance matrix for those samples).
 60                - `'cov_matrix_imputation'`: 
 61                    Eigen-decomposition of the covariance matrix after first imputing the covariance matrix missing values 
 62                    using the Iterative SVD imputation method.
 63                - `'cov_matrix_imputation_ils'`: 
 64                    The method of 'cov_matrix_imputation', but where afterwards missing samples are re-projected onto the space 
 65                    given by 'cov_matrix_imputation' using the same iterative method on the original covariance matrix just 
 66                    as done in 'regularized_optimization_ils'.
 67                - `'nonmissing_pca_ils'`: 
 68                    The method of 'weighted_cov_pca' on the non-missing samples, followed by the projection of missing samples onto 
 69                    the space given by 'weighted_cov_pca' using the same iterative method on the original covariance matrix just as 
 70                    done in 'regularized_optimization_ils'.
 71            snpobj (SNPObject, optional): 
 72                A SNPObject instance.
 73            laiobj (LAIObject, optional): 
 74                A LAIObject instance.
 75            labels_file (str, optional): 
 76                Path to the labels file in .tsv format. The first column, `indID`, contains the individual identifiers, and the second 
 77                column, `label`, specifies the groups for all individuals. If `is_weighted=True`, a `weight` column with individual 
 78                weights is required. Optionally, `combination` and `combination_weight` columns can specify sets of individuals to be 
 79                combined into groups, with respective weights.
 80            ancestry (str, optional): 
 81                Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at `0`.
 82            is_masked (bool, default=True): 
 83                True if an ancestry file is passed for ancestry-specific masking, or False otherwise.
 84            prob_thresh (float, default=0): 
 85                Minimum probability threshold for a SNP to belong to an ancestry.
 86            average_strands (bool, default=False): 
 87                True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
 88            is_weighted (bool, default=False): 
 89                True if weights are provided in the labels file, or False otherwise.
 90            groups_to_remove (dict of int to list of str, default={}): 
 91                Dictionary specifying groups to exclude from analysis. Keys are array numbers, and values are 
 92                lists of groups to remove for each array.
 93                Example: `{1: ['group1', 'group2'], 2: [], 3: ['group3']}`.
 94            min_percent_snps (float, default=4): 
 95                Minimum percentage of SNPs that must be known for an individual to be included in the analysis.
 96                All individuals with fewer percent of unmasked SNPs than this threshold will be excluded.
 97            save_masks (bool, default=False): 
 98                True if the masked matrices are to be saved in a `.npz` file, or False otherwise.
 99            load_masks (bool, default=False): 
100                True if the masked matrices are to be loaded from a pre-existing `.npz` file specified by `masks_file`, or False otherwise.
101            masks_file (str or pathlib.Path, default='masks.npz'): 
102                Path to the `.npz` file used for saving/loading masked matrices.
103            output_file (str or pathlib.Path, default='output.tsv'): 
104                Path to the output `.tsv` file where mdPCA results are saved.
105            covariance_matrix_file (str, optional): 
106                Path to save the covariance matrix file in `.npy` format. If None, the covariance matrix is not saved. Default is None.
107            n_components (int, default=2): 
108                The number of principal components.
109            rsid_or_chrompos (int, default=2): 
110                Format indicator for SNP IDs in the SNP data. Use 1 for `rsID` format or 2 for `chromosome_position`.
111            percent_vals_masked (float, default=0): 
112                Percentage of values in the covariance matrix to be masked and then imputed. Only applicable if `method` is 
113                `'cov_matrix_imputation'` or `'cov_matrix_imputation_ils'`.
114        """
115        self.__snpobj = snpobj
116        self.__laiobj = laiobj
117        self.__labels_file = labels_file
118        self.__ancestry = ancestry
119        self.__method = method
120        self.__is_masked = is_masked
121        self.__prob_thresh = prob_thresh
122        self.__average_strands = average_strands
123        self.__is_weighted = is_weighted
124        self.__groups_to_remove = groups_to_remove
125        self.__min_percent_snps = min_percent_snps
126        self.__save_masks = save_masks
127        self.__load_masks = load_masks
128        self.__masks_file = masks_file
129        self.__output_file = output_file
130        self.__covariance_matrix_file = covariance_matrix_file
131        self.__n_components = n_components
132        self.__rsid_or_chrompos = rsid_or_chrompos
133        self.__percent_vals_masked = percent_vals_masked
134        self.__X_new_ = None  # Store transformed SNP data
135        self.__haplotypes_ = None  # Store haplotypes after filtering if min_percent_snps > 0
136        self.__samples_ = None  # Store samples after filtering if min_percent_snps > 0
137
138        # Fit and transform if a `snpobj`, `laiobj`, `labels_file`, and `ancestry` are provided
139        if self.snpobj is not None and self.laiobj is not None and self.labels_file is not None and self.ancestry is not None:
140            self.fit_transform(snpobj, laiobj, labels_file, ancestry)
Arguments:
  • method (str, default='weighted_cov_pca'): The PCA method to use for dimensionality reduction. Options include:
    • 'weighted_cov_pca': Simple covariance-based PCA, weighted by sample strengths.
    • 'regularized_optimization_ils': Regularized optimization followed by iterative, weighted (via the strengths) least squares projection of missing samples using the original covariance matrix (considering only relevant elements not missing in the original covariance matrix for those samples).
    • 'cov_matrix_imputation': Eigen-decomposition of the covariance matrix after first imputing the covariance matrix missing values using the Iterative SVD imputation method.
    • 'cov_matrix_imputation_ils': The method of 'cov_matrix_imputation', but where afterwards missing samples are re-projected onto the space given by 'cov_matrix_imputation' using the same iterative method on the original covariance matrix just as done in 'regularized_optimization_ils'.
    • 'nonmissing_pca_ils': The method of 'weighted_cov_pca' on the non-missing samples, followed by the projection of missing samples onto the space given by 'weighted_cov_pca' using the same iterative method on the original covariance matrix just as done in 'regularized_optimization_ils'.
  • snpobj (SNPObject, optional): A SNPObject instance.
  • laiobj (LAIObject, optional): A LAIObject instance.
  • labels_file (str, optional): Path to the labels file in .tsv format. The first column, indID, contains the individual identifiers, and the second column, label, specifies the groups for all individuals. If is_weighted=True, a weight column with individual weights is required. Optionally, combination and combination_weight columns can specify sets of individuals to be combined into groups, with respective weights.
  • ancestry (str, optional): Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0.
  • is_masked (bool, default=True): True if an ancestry file is passed for ancestry-specific masking, or False otherwise.
  • prob_thresh (float, default=0): Minimum probability threshold for a SNP to belong to an ancestry.
  • average_strands (bool, default=False): True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
  • is_weighted (bool, default=False): True if weights are provided in the labels file, or False otherwise.
  • groups_to_remove (dict of int to list of str, default={}): Dictionary specifying groups to exclude from analysis. Keys are array numbers, and values are lists of groups to remove for each array. Example: {1: ['group1', 'group2'], 2: [], 3: ['group3']}.
  • min_percent_snps (float, default=4): Minimum percentage of SNPs that must be known for an individual to be included in the analysis. All individuals with fewer percent of unmasked SNPs than this threshold will be excluded.
  • save_masks (bool, default=False): True if the masked matrices are to be saved in a .npz file, or False otherwise.
  • load_masks (bool, default=False): True if the masked matrices are to be loaded from a pre-existing .npz file specified by masks_file, or False otherwise.
  • masks_file (str or pathlib.Path, default='masks.npz'): Path to the .npz file used for saving/loading masked matrices.
  • output_file (str or pathlib.Path, default='output.tsv'): Path to the output .tsv file where mdPCA results are saved.
  • covariance_matrix_file (str, optional): Path to save the covariance matrix file in .npy format. If None, the covariance matrix is not saved. Default is None.
  • n_components (int, default=2): The number of principal components.
  • rsid_or_chrompos (int, default=2): Format indicator for SNP IDs in the SNP data. Use 1 for rsID format or 2 for chromosome_position.
  • percent_vals_masked (float, default=0): Percentage of values in the covariance matrix to be masked and then imputed. Only applicable if method is 'cov_matrix_imputation' or 'cov_matrix_imputation_ils'.
method: str
162    @property
163    def method(self) -> str:
164        """
165        Retrieve `method`.
166        
167        Returns:
168            **str:** The PCA method to use for dimensionality reduction.
169        """
170        return self.__method

Retrieve method.

Returns:

str: The PCA method to use for dimensionality reduction.

snpobj: Optional[snputils.snp.genobj.SNPObject]
179    @property
180    def snpobj(self) -> Optional['SNPObject']:
181        """
182        Retrieve `snpobj`.
183        
184        Returns:
185            **SNPObject:** A SNPObject instance.
186        """
187        return self.__snpobj

Retrieve snpobj.

Returns:

SNPObject: A SNPObject instance.

laiobj: Optional[snputils.ancestry.genobj.LocalAncestryObject]
196    @property
197    def laiobj(self) -> Optional['LocalAncestryObject']:
198        """
199        Retrieve `laiobj`.
200        
201        Returns:
202            **LocalAncestryObject:** A LocalAncestryObject instance.
203        """
204        return self.__laiobj

Retrieve laiobj.

Returns:

LocalAncestryObject: A LocalAncestryObject instance.

labels_file: Optional[str]
213    @property
214    def labels_file(self) -> Optional[str]:
215        """
216        Retrieve `labels_file`.
217        
218        Returns:
219            **str:** Path to the labels file in `.tsv` format.
220        """
221        return self.__labels_file

Retrieve labels_file.

Returns:

str: Path to the labels file in .tsv format.

ancestry: Optional[str]
230    @property
231    def ancestry(self) -> Optional[str]:
232        """
233        Retrieve `ancestry`.
234        
235        Returns:
236            **str:** Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0.
237        """
238        return self.__ancestry

Retrieve ancestry.

Returns:

str: Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0.

is_masked: bool
247    @property
248    def is_masked(self) -> bool:
249        """
250        Retrieve `is_masked`.
251        
252        Returns:
253            **bool:** True if an ancestry file is passed for ancestry-specific masking, or False otherwise.
254        """
255        return self.__is_masked

Retrieve is_masked.

Returns:

bool: True if an ancestry file is passed for ancestry-specific masking, or False otherwise.

prob_thresh: float
264    @property
265    def prob_thresh(self) -> float:
266        """
267        Retrieve `prob_thresh`.
268        
269        Returns:
270            **float:** Minimum probability threshold for a SNP to belong to an ancestry.
271        """
272        return self.__prob_thresh

Retrieve prob_thresh.

Returns:

float: Minimum probability threshold for a SNP to belong to an ancestry.

average_strands: bool
280    @property
281    def average_strands(self) -> bool:
282        """
283        Retrieve `average_strands`.
284        
285        Returns:
286            **bool:** True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
287        """
288        return self.__average_strands

Retrieve average_strands.

Returns:

bool: True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.

is_weighted: bool
297    @property
298    def is_weighted(self) -> bool:
299        """
300        Retrieve `is_weighted`.
301        
302        Returns:
303            **bool:** True if weights are provided in the labels file, or False otherwise.
304        """
305        return self.__is_weighted

Retrieve is_weighted.

Returns:

bool: True if weights are provided in the labels file, or False otherwise.

groups_to_remove: Dict[int, List[str]]
314    @property
315    def groups_to_remove(self) -> Dict[int, List[str]]:
316        """
317        Retrieve `groups_to_remove`.
318        
319        Returns:
320            **dict of int to list of str:** Dictionary specifying groups to exclude from analysis. Keys are array numbers, and values are 
321                lists of groups to remove for each array. Example: `{1: ['group1', 'group2'], 2: [], 3: ['group3']}`.
322        """
323        return self.__groups_to_remove

Retrieve groups_to_remove.

Returns:

dict of int to list of str: Dictionary specifying groups to exclude from analysis. Keys are array numbers, and values are lists of groups to remove for each array. Example: {1: ['group1', 'group2'], 2: [], 3: ['group3']}.

min_percent_snps: float
332    @property
333    def min_percent_snps(self) -> float:
334        """
335        Retrieve `min_percent_snps`.
336        
337        Returns:
338            **float:** 
339                Minimum percentage of SNPs that must be known for an individual to be included in the analysis.
340                All individuals with fewer percent of unmasked SNPs than this threshold will be excluded.
341        """
342        return self.__min_percent_snps

Retrieve min_percent_snps.

Returns:

float: Minimum percentage of SNPs that must be known for an individual to be included in the analysis. All individuals with fewer percent of unmasked SNPs than this threshold will be excluded.

save_masks: bool
351    @property
352    def save_masks(self) -> bool:
353        """
354        Retrieve `save_masks`.
355        
356        Returns:
357            **bool:** True if the masked matrices are to be saved in a `.npz` file, or False otherwise.
358        """
359        return self.__save_masks

Retrieve save_masks.

Returns:

bool: True if the masked matrices are to be saved in a .npz file, or False otherwise.

load_masks: bool
368    @property
369    def load_masks(self) -> bool:
370        """
371        Retrieve `load_masks`.
372        
373        Returns:
374            **bool:** 
375                True if the masked matrices are to be loaded from a pre-existing `.npz` file specified 
376                by `masks_file`, or False otherwise.
377        """
378        return self.__load_masks

Retrieve load_masks.

Returns:

bool: True if the masked matrices are to be loaded from a pre-existing .npz file specified by masks_file, or False otherwise.

masks_file: Union[str, pathlib._local.Path]
387    @property
388    def masks_file(self) -> Union[str, pathlib.Path]:
389        """
390        Retrieve `masks_file`.
391        
392        Returns:
393            **str or pathlib.Path:** Path to the `.npz` file used for saving/loading masked matrices.
394        """
395        return self.__masks_file

Retrieve masks_file.

Returns:

str or pathlib.Path: Path to the .npz file used for saving/loading masked matrices.

output_file: Union[str, pathlib._local.Path]
404    @property
405    def output_file(self) -> Union[str, pathlib.Path]:
406        """
407        Retrieve `output_file`.
408        
409        Returns:
410            **str or pathlib.Path:** Path to the output `.tsv` file where mdPCA results are saved.
411        """
412        return self.__output_file

Retrieve output_file.

Returns:

str or pathlib.Path: Path to the output .tsv file where mdPCA results are saved.

covariance_matrix_file: Optional[str]
421    @property
422    def covariance_matrix_file(self) -> Optional[str]:
423        """
424        Retrieve `covariance_matrix_file`.
425        
426        Returns:
427            **str:** Path to save the covariance matrix file in `.npy` format.
428        """
429        return self.__covariance_matrix_file

Retrieve covariance_matrix_file.

Returns:

str: Path to save the covariance matrix file in .npy format.

n_components: int
438    @property
439    def n_components(self) -> int:
440        """
441        Retrieve `n_components`.
442        
443        Returns:
444            **int:** The number of principal components.
445        """
446        return self.__n_components

Retrieve n_components.

Returns:

int: The number of principal components.

rsid_or_chrompos: int
455    @property
456    def rsid_or_chrompos(self) -> int:
457        """
458        Retrieve `rsid_or_chrompos`.
459        
460        Returns:
461            **int:** Format indicator for SNP IDs in the SNP data. Use 1 for `rsID` format or 2 for `chromosome_position`.
462        """
463        return self.__rsid_or_chrompos

Retrieve rsid_or_chrompos.

Returns:

int: Format indicator for SNP IDs in the SNP data. Use 1 for rsID format or 2 for chromosome_position.

percent_vals_masked: float
472    @property
473    def percent_vals_masked(self) -> float:
474        """
475        Retrieve `percent_vals_masked`.
476        
477        Returns:
478            **float:** 
479                Percentage of values in the covariance matrix to be masked and then imputed. Only applicable if `method` is 
480                `'cov_matrix_imputation'` or `'cov_matrix_imputation_ils'`.
481        """
482        return self.__percent_vals_masked

Retrieve percent_vals_masked.

Returns:

float: Percentage of values in the covariance matrix to be masked and then imputed. Only applicable if method is 'cov_matrix_imputation' or 'cov_matrix_imputation_ils'.

X_new_: Optional[numpy.ndarray]
491    @property
492    def X_new_(self) -> Optional[np.ndarray]:
493        """
494        Retrieve `X_new_`.
495
496        Returns:
497            **array of shape (n_samples, n_components):** 
498                The transformed SNP data projected onto the `n_components` principal components.
499                n_haplotypes_ is the number of haplotypes, potentially reduced if filtering is applied 
500                (`min_percent_snps > 0`). For diploid individuals without filtering, the shape is 
501                `(n_samples * 2, n_components)`.
502        """
503        return self.__X_new_

Retrieve X_new_.

Returns:

array of shape (n_samples, n_components): The transformed SNP data projected onto the n_components principal components. n_haplotypes_ is the number of haplotypes, potentially reduced if filtering is applied (min_percent_snps > 0). For diploid individuals without filtering, the shape is (n_samples * 2, n_components).

haplotypes_: Optional[List[str]]
512    @property
513    def haplotypes_(self) -> Optional[List[str]]:
514        """
515        Retrieve `haplotypes_`.
516
517        Returns:
518            list of str:
519                A list of unique haplotype identifiers.
520        """
521        if isinstance(self.__haplotypes_, np.ndarray):
522            return self.__haplotypes_.ravel().tolist()  # Flatten and convert NumPy array to a list
523        elif isinstance(self.__haplotypes_, list):
524            if len(self.__haplotypes_) == 1 and isinstance(self.__haplotypes_[0], np.ndarray):
525                return self.__haplotypes_[0].ravel().tolist()  # Handle list containing a single array
526            return self.__haplotypes_  # Already a flat list
527        elif self.__haplotypes_ is None:
528            return None  # If no haplotypes are set
529        else:
530            raise TypeError("`haplotypes_` must be a list or a NumPy array.")

Retrieve haplotypes_.

Returns:

list of str: A list of unique haplotype identifiers.

samples_: Optional[List[str]]
547    @property
548    def samples_(self) -> Optional[List[str]]:
549        """
550        Retrieve `samples_`.
551
552        Returns:
553            list of str:
554                A list of sample identifiers based on `haplotypes_` and `average_strands`.
555        """
556        haplotypes = self.haplotypes_
557        if haplotypes is None:
558            return None
559        if self.__average_strands:
560            return haplotypes
561        else:
562            return [x[:-2] for x in haplotypes]

Retrieve samples_.

Returns:

list of str: A list of sample identifiers based on haplotypes_ and average_strands.

n_haplotypes: Optional[int]
564    @property
565    def n_haplotypes(self) -> Optional[int]:
566        """
567        Retrieve `n_haplotypes`.
568
569        Returns:
570            **int:**
571                The total number of haplotypes, potentially reduced if filtering is applied 
572                (`min_percent_snps > 0`).
573        """
574        return len(self.haplotypes_)

Retrieve n_haplotypes.

Returns:

int: The total number of haplotypes, potentially reduced if filtering is applied (min_percent_snps > 0).

n_samples: Optional[int]
576    @property
577    def n_samples(self) -> Optional[int]:
578        """
579        Retrieve `n_samples`.
580
581        Returns:
582            **int:**
583                The total number of samples, potentially reduced if filtering is applied 
584                (`min_percent_snps > 0`).
585        """
586        return len(np.unique(self.samples_))

Retrieve n_samples.

Returns:

int: The total number of samples, potentially reduced if filtering is applied (min_percent_snps > 0).

def copy(self) -> mdPCA:
588    def copy(self) -> 'mdPCA':
589        """
590        Create and return a copy of `self`.
591
592        Returns:
593            **mdPCA:** 
594                A new instance of the current object.
595        """
596        return copy.copy(self)

Create and return a copy of self.

Returns:

mdPCA: A new instance of the current object.

def fit_transform( self, snpobj: Optional[snputils.snp.genobj.SNPObject] = None, laiobj: Optional[snputils.ancestry.genobj.LocalAncestryObject] = None, labels_file: Optional[str] = None, ancestry: Optional[str] = None, average_strands: Optional[bool] = None) -> numpy.ndarray:
 923    def fit_transform(
 924            self,
 925            snpobj: Optional['SNPObject'] = None, 
 926            laiobj: Optional['LocalAncestryObject'] = None,
 927            labels_file: Optional[str] = None,
 928            ancestry: Optional[str] = None,
 929            average_strands: Optional[bool] = None
 930        ) -> np.ndarray:
 931        """
 932        Fit the model to the SNP data stored in the provided `snpobj` and apply the dimensionality reduction on the same SNP data.
 933        
 934        This method starts by loading or updating SNP and ancestry data. Then, it manages missing values by applying 
 935        masks based on ancestry, either by loading a pre-existing mask or generating new ones. After processing these 
 936        masks to produce an incomplete SNP data matrix, it applies the chosen mdPCA method to reduce dimensionality 
 937        while handling missing data as specified.
 938
 939        Args:
 940            snpobj (SNPObject, optional): 
 941                A SNPObject instance.
 942            laiobj (LAIObject, optional): 
 943                A LAIObject instance.
 944            labels_file (str, optional): 
 945                Path to the labels file in .tsv format. The first column, `indID`, contains the individual identifiers, and the second 
 946                column, `label`, specifies the groups for all individuals. If `is_weighted=True`, a `weight` column with individual 
 947                weights is required. Optionally, `combination` and `combination_weight` columns can specify sets of individuals to be 
 948                combined into groups, with respective weights.
 949            ancestry (str, optional): 
 950                Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0.
 951            average_strands (bool, optional): 
 952                True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
 953                If None, defaults to `self.average_strands`.
 954
 955        Returns:
 956            **array of shape (n_samples, n_components):** 
 957                The transformed SNP data projected onto the `n_components` principal components, stored in `self.X_new_`.
 958        """
 959        if snpobj is None:
 960            snpobj = self.snpobj
 961        if laiobj is None:
 962            laiobj = self.laiobj
 963        if labels_file is None:
 964            labels_file = self.labels_file
 965        if ancestry is None:
 966            ancestry = self.ancestry
 967        if average_strands is None:
 968            average_strands = self.average_strands
 969        
 970        if self.load_masks:
 971            masks, rs_id_list, ind_id_list, _, weights = self._load_mask_file()
 972        else:
 973            masks, rs_id_list, ind_id_list = array_process(
 974                self.snpobj,
 975                self.laiobj,
 976                self.average_strands,
 977                self.prob_thresh,
 978                self.is_masked,
 979                self.rsid_or_chrompos
 980            )
 981
 982            masks, ind_id_list, _, weights = process_labels_weights(
 983                self.labels_file,
 984                masks,
 985                rs_id_list,
 986                ind_id_list,
 987                self.average_strands,
 988                self.ancestry,
 989                self.min_percent_snps,
 990                self.groups_to_remove,
 991                self.is_weighted,
 992                self.save_masks,
 993                self.masks_file
 994            )
 995
 996        X_incomplete, _, _ = self._process_masks(masks, rs_id_list, ind_id_list)
 997
 998        # Call run_cov_matrix with the specified method
 999        self.X_new_ = self._run_cov_matrix(
1000            X_incomplete,
1001            weights
1002        )
1003
1004        self.haplotypes_ = ind_id_list
1005
1006        return self.X_new_

Fit the model to the SNP data stored in the provided snpobj and apply the dimensionality reduction on the same SNP data.

This method starts by loading or updating SNP and ancestry data. Then, it manages missing values by applying masks based on ancestry, either by loading a pre-existing mask or generating new ones. After processing these masks to produce an incomplete SNP data matrix, it applies the chosen mdPCA method to reduce dimensionality while handling missing data as specified.

Arguments:
  • snpobj (SNPObject, optional): A SNPObject instance.
  • laiobj (LAIObject, optional): A LAIObject instance.
  • labels_file (str, optional): Path to the labels file in .tsv format. The first column, indID, contains the individual identifiers, and the second column, label, specifies the groups for all individuals. If is_weighted=True, a weight column with individual weights is required. Optionally, combination and combination_weight columns can specify sets of individuals to be combined into groups, with respective weights.
  • ancestry (str, optional): Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0.
  • average_strands (bool, optional): True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. If None, defaults to self.average_strands.
Returns:

array of shape (n_samples, n_components): The transformed SNP data projected onto the n_components principal components, stored in self.X_new_.