snputils.processing.mdpca
1import pathlib 2import os 3import time 4import gc 5import logging 6import logging.config 7import numpy as np 8import copy 9from typing import Optional, Dict, List, Union 10from sklearn.decomposition import TruncatedSVD 11 12from snputils.snp.genobj.snpobj import SNPObject 13from snputils.ancestry.genobj.local import LocalAncestryObject 14from ._utils.gen_tools import process_calldata_gt, process_labels_weights 15from ._utils.iterative_svd import IterativeSVD 16 17 18class mdPCA: 19 """ 20 A class for performing missing data principal component analysis (mdPCA) on SNP data. 21 22 The mdPCA class focuses on genotype segments from the ancestry of interest when the `is_masked` flag is set to `True`. It offers 23 flexible processing options, allowing either separate handling of masked haplotype strands or combining (averaging) strands into a 24 single composite representation for each individual. Moreover, the analysis can be performed on individual-level data, group-level SNP 25 frequencies, or a combination of both. 26 27 If the `snpobj`, `laiobj`, `labels_file`, and `ancestry` parameters are all provided during instantiation, the `fit_transform` method 28 will be automatically called, applying the specified mdPCA method to transform the data upon instantiation. 29 """ 30 def __init__( 31 self, 32 method: str = 'weighted_cov_pca', 33 snpobj: Optional['SNPObject'] = None, 34 laiobj: Optional['LocalAncestryObject'] = None, 35 labels_file: Optional[str] = None, 36 ancestry: Optional[Union[int, str]] = None, 37 is_masked: bool = True, 38 average_strands: bool = False, 39 force_nan_incomplete_strands: bool = False, 40 is_weighted: bool = False, 41 groups_to_remove: List[str] = None, 42 min_percent_snps: float = 4, 43 group_snp_frequencies_only: bool = True, 44 save_masks: bool = False, 45 load_masks: bool = False, 46 masks_file: Union[str, pathlib.Path] = 'masks.npz', 47 output_file: Union[str, pathlib.Path] = 'output.tsv', 48 covariance_matrix_file: Optional[str] = None, 49 n_components: int = 2, 50 rsid_or_chrompos: int = 2, 51 percent_vals_masked: float = 0 52 ): 53 """ 54 Args: 55 method (str, default='weighted_cov_pca'): 56 The PCA method to use for dimensionality reduction. Options include: 57 - `'weighted_cov_pca'`: 58 Simple covariance-based PCA, weighted by sample strengths. 59 - `'regularized_optimization_ils'`: 60 Regularized optimization followed by iterative, weighted (via the strengths) least squares projection of 61 missing samples using the original covariance matrix (considering only relevant elements not missing in 62 the original covariance matrix for those samples). 63 - `'cov_matrix_imputation'`: 64 Eigen-decomposition of the covariance matrix after first imputing the covariance matrix missing values 65 using the Iterative SVD imputation method. 66 - `'cov_matrix_imputation_ils'`: 67 The method of 'cov_matrix_imputation', but where afterwards missing samples are re-projected onto the space 68 given by 'cov_matrix_imputation' using the same iterative method on the original covariance matrix just 69 as done in 'regularized_optimization_ils'. 70 - `'nonmissing_pca_ils'`: 71 The method of 'weighted_cov_pca' on the non-missing samples, followed by the projection of missing samples onto 72 the space given by 'weighted_cov_pca' using the same iterative method on the original covariance matrix just as 73 done in 'regularized_optimization_ils'. 74 snpobj (SNPObject, optional): 75 A SNPObject instance. 76 laiobj (LAIObject, optional): 77 A LAIObject instance. 78 labels_file (str, optional): 79 Path to a `.tsv` file with metadata on individuals, including population labels, optional weights, and groupings. 80 The `indID` column must contain unique individual identifiers matching those in `laiobj` and `snpobj` for proper alignment. 81 The `label` column assigns population groups. If `is_weighted=True`, a `weight` column must be provided, assigning a weight to 82 each individual, where those with a weight of zero are removed. Optional columns include `combination` and `combination_weight` 83 to aggregate individuals into combined groups, where SNP frequencies represent their sequences. The `combination` column assigns 84 each individual to a specific group (0 for no combination, 1 for the first group, 2 for the second, etc.). All members of a group 85 must share the same `label` and `combination_weight`. If `combination_weight` column is not provided, the combinations are 86 assigned a default weight of `1`. Individuals excluded via `groups_to_remove` or those falling below `min_percent_snps` are removed 87 from the analysis. 88 ancestry (int or str, optional): 89 Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at `0`. The ancestry input can be: 90 - An integer (e.g., 0, 1, 2). 91 - A string representation of an integer (e.g., '0', '1'). 92 - A string matching one of the ancestry map values (e.g., 'Africa'). 93 is_masked (bool, default=True): 94 If `True`, applies ancestry-specific masking to the genotype matrix, retaining only genotype data 95 corresponding to the specified `ancestry`. If `False`, uses the full, unmasked genotype matrix. 96 average_strands (bool, default=False): 97 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 98 force_nan_incomplete_strands (bool): 99 If `True`, sets the result to NaN if either haplotype in a pair is NaN. 100 Otherwise, computes the mean while ignoring NaNs (e.g., 0|NaN -> 0, 1|NaN -> 1). 101 is_weighted (bool, default=False): 102 If `True`, assigns individual weights from the `weight` column in `labels_file`. Otherwise, all individuals have equal weight of `1`. 103 groups_to_remove (list of str, optional): 104 List with groups to exclude from analysis. Example: ['group1', 'group2']. 105 min_percent_snps (float, default=4): 106 Minimum percentage of SNPs that must be known for an individual and of the ancestry of interest to be included in the analysis. 107 All individuals with fewer percent of unmasked SNPs than this threshold will be excluded. 108 group_snp_frequencies_only (bool, default=True): 109 If True, mdPCA is performed exclusively on group-level SNP frequencies, ignoring individual-level data. This applies when `is_weighted` is 110 set to True and a `combination` column is provided in the `labels_file`, meaning individuals are aggregated into groups based on their assigned 111 labels. If False, mdPCA is performed on individual-level SNP data alone or on both individual-level and group-level SNP frequencies when 112 `is_weighted` is True and a `combination` column is provided. 113 save_masks (bool, default=False): 114 True if the masked matrices are to be saved in a `.npz` file, or False otherwise. 115 load_masks (bool, default=False): 116 True if the masked matrices are to be loaded from a pre-existing `.npz` file specified by `masks_file`, or False otherwise. 117 masks_file (str or pathlib.Path, default='masks.npz'): 118 Path to the `.npz` file used for saving/loading masked matrices. 119 output_file (str or pathlib.Path, default='output.tsv'): 120 Path to the output `.tsv` file where mdPCA results are saved. 121 covariance_matrix_file (str, optional): 122 Path to save the covariance matrix file in `.npy` format. If None, the covariance matrix is not saved. Default is None. 123 n_components (int, default=2): 124 The number of principal components. 125 rsid_or_chrompos (int, default=2): 126 Format indicator for SNP IDs in `self.__X_new_`. Use 1 for `rsID` format or 2 for `chromosome_position`. 127 percent_vals_masked (float, default=0): 128 Percentage of values in the covariance matrix to be masked and then imputed. Only applicable if `method` is 129 `'cov_matrix_imputation'` or `'cov_matrix_imputation_ils'`. 130 """ 131 self.__snpobj = snpobj 132 self.__laiobj = laiobj 133 self.__labels_file = labels_file 134 self.__ancestry = self._define_ancestry(ancestry, laiobj.ancestry_map) 135 self.__method = method 136 self.__is_masked = is_masked 137 self.__average_strands = average_strands 138 self.__force_nan_incomplete_strands = force_nan_incomplete_strands 139 self.__is_weighted = is_weighted 140 self.__groups_to_remove = groups_to_remove 141 self.__min_percent_snps = min_percent_snps 142 self.__group_snp_frequencies_only = group_snp_frequencies_only 143 self.__save_masks = save_masks 144 self.__load_masks = load_masks 145 self.__masks_file = masks_file 146 self.__output_file = output_file 147 self.__covariance_matrix_file = covariance_matrix_file 148 self.__n_components = n_components 149 self.__rsid_or_chrompos = rsid_or_chrompos 150 self.__percent_vals_masked = percent_vals_masked 151 self.__X_new_ = None # Store transformed SNP data 152 self.__haplotypes_ = None # Store haplotypes of X_new_ (after filtering if min_percent_snps > 0) 153 self.__samples_ = None # Store samples of X_new_ (after filtering if min_percent_snps > 0) 154 self.__variants_id_ = None # Store variants ID (after filtering SNPs not in laiobj) 155 156 # Fit and transform if a `snpobj`, `laiobj`, `labels_file`, and `ancestry` are provided 157 if self.snpobj is not None and self.laiobj is not None and self.labels_file is not None and self.ancestry is not None: 158 self.fit_transform(snpobj, laiobj, labels_file, ancestry) 159 160 def __getitem__(self, key): 161 """ 162 To access an attribute of the class using the square bracket notation, 163 similar to a dictionary. 164 """ 165 try: 166 return getattr(self, key) 167 except AttributeError: 168 raise KeyError(f'Invalid key: {key}') 169 170 def __setitem__(self, key, value): 171 """ 172 To set an attribute of the class using the square bracket notation, 173 similar to a dictionary. 174 """ 175 try: 176 setattr(self, key, value) 177 except AttributeError: 178 raise KeyError(f'Invalid key: {key}') 179 180 @property 181 def method(self) -> str: 182 """ 183 Retrieve `method`. 184 185 Returns: 186 **str:** The PCA method to use for dimensionality reduction. 187 """ 188 return self.__method 189 190 @method.setter 191 def method(self, x: str) -> None: 192 """ 193 Update `method`. 194 """ 195 self.__method = x 196 197 @property 198 def snpobj(self) -> Optional['SNPObject']: 199 """ 200 Retrieve `snpobj`. 201 202 Returns: 203 **SNPObject:** A SNPObject instance. 204 """ 205 return self.__snpobj 206 207 @snpobj.setter 208 def snpobj(self, x: 'SNPObject') -> None: 209 """ 210 Update `snpobj`. 211 """ 212 self.__snpobj = x 213 214 @property 215 def laiobj(self) -> Optional['LocalAncestryObject']: 216 """ 217 Retrieve `laiobj`. 218 219 Returns: 220 **LocalAncestryObject:** A LocalAncestryObject instance. 221 """ 222 return self.__laiobj 223 224 @laiobj.setter 225 def laiobj(self, x: 'LocalAncestryObject') -> None: 226 """ 227 Update `laiobj`. 228 """ 229 self.__laiobj = x 230 231 @property 232 def labels_file(self) -> Optional[str]: 233 """ 234 Retrieve `labels_file`. 235 236 Returns: 237 **str:** Path to the labels file in `.tsv` format. 238 """ 239 return self.__labels_file 240 241 @labels_file.setter 242 def labels_file(self, x: str) -> None: 243 """ 244 Update `labels_file`. 245 """ 246 self.__labels_file = x 247 248 @property 249 def ancestry(self) -> Optional[str]: 250 """ 251 Retrieve `ancestry`. 252 253 Returns: 254 **str:** Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0. 255 """ 256 return self.__ancestry 257 258 @ancestry.setter 259 def ancestry(self, x: str) -> None: 260 """ 261 Update `ancestry`. 262 """ 263 self.__ancestry = x 264 265 @property 266 def is_masked(self) -> bool: 267 """ 268 Retrieve `is_masked`. 269 270 Returns: 271 **bool:** True if an ancestry file is passed for ancestry-specific masking, or False otherwise. 272 """ 273 return self.__is_masked 274 275 @is_masked.setter 276 def is_masked(self, x: bool) -> None: 277 """ 278 Update `is_masked`. 279 """ 280 self.__is_masked = x 281 282 @property 283 def average_strands(self) -> bool: 284 """ 285 Retrieve `average_strands`. 286 287 Returns: 288 **bool:** True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 289 """ 290 return self.__average_strands 291 292 @average_strands.setter 293 def average_strands(self, x: bool) -> None: 294 """ 295 Update `average_strands`. 296 """ 297 self.__average_strands = x 298 299 @property 300 def force_nan_incomplete_strands(self) -> bool: 301 """ 302 Retrieve `force_nan_incomplete_strands`. 303 304 Returns: 305 **bool**: If `True`, sets the result to NaN if either haplotype in a pair is NaN. 306 Otherwise, computes the mean while ignoring NaNs (e.g., 0|NaN -> 0, 1|NaN -> 1). 307 """ 308 return self.__force_nan_incomplete_strands 309 310 @force_nan_incomplete_strands.setter 311 def force_nan_incomplete_strands(self, x: bool) -> None: 312 """ 313 Update `force_nan_incomplete_strands`. 314 """ 315 self.__force_nan_incomplete_strands = x 316 317 @property 318 def is_weighted(self) -> bool: 319 """ 320 Retrieve `is_weighted`. 321 322 Returns: 323 **bool:** True if weights are provided in the labels file, or False otherwise. 324 """ 325 return self.__is_weighted 326 327 @is_weighted.setter 328 def is_weighted(self, x: bool) -> None: 329 """ 330 Update `is_weighted`. 331 """ 332 self.__is_weighted = x 333 334 @property 335 def groups_to_remove(self) -> Dict[int, List[str]]: 336 """ 337 Retrieve `groups_to_remove`. 338 339 Returns: 340 **dict of int to list of str:** Dictionary specifying groups to exclude from analysis. Keys are array numbers, and values are 341 lists of groups to remove for each array. Example: `{1: ['group1', 'group2'], 2: [], 3: ['group3']}`. 342 """ 343 return self.__groups_to_remove 344 345 @groups_to_remove.setter 346 def groups_to_remove(self, x: Dict[int, List[str]]) -> None: 347 """ 348 Update `groups_to_remove`. 349 """ 350 self.__groups_to_remove = x 351 352 @property 353 def min_percent_snps(self) -> float: 354 """ 355 Retrieve `min_percent_snps`. 356 357 Returns: 358 **float:** 359 Minimum percentage of SNPs that must be known for an individual to be included in the analysis. 360 All individuals with fewer percent of unmasked SNPs than this threshold will be excluded. 361 """ 362 return self.__min_percent_snps 363 364 @min_percent_snps.setter 365 def min_percent_snps(self, x: float) -> None: 366 """ 367 Update `min_percent_snps`. 368 """ 369 self.__min_percent_snps = x 370 371 @property 372 def group_snp_frequencies_only(self) -> bool: 373 """ 374 Retrieve `group_snp_frequencies_only`. 375 376 Returns: 377 **bool:** 378 If True, mdPCA is performed exclusively on group-level SNP frequencies, ignoring individual-level data. This applies 379 when `is_weighted` is set to True and a `combination` column is provided in the `labels_file`, meaning individuals are 380 aggregated into groups based on their assigned labels. If False, mdPCA is performed on individual-level SNP data alone 381 or on both individual-level and group-level SNP frequencies when `is_weighted` is True and a `combination` column is provided. 382 """ 383 return self.__group_snp_frequencies_only 384 385 @group_snp_frequencies_only.setter 386 def group_snp_frequencies_only(self, x: bool) -> None: 387 """ 388 Update `group_snp_frequencies_only`. 389 """ 390 self.__group_snp_frequencies_only = x 391 392 @property 393 def save_masks(self) -> bool: 394 """ 395 Retrieve `save_masks`. 396 397 Returns: 398 **bool:** True if the masked matrices are to be saved in a `.npz` file, or False otherwise. 399 """ 400 return self.__save_masks 401 402 @save_masks.setter 403 def save_masks(self, x: bool) -> None: 404 """ 405 Update `save_masks`. 406 """ 407 self.__save_masks = x 408 409 @property 410 def load_masks(self) -> bool: 411 """ 412 Retrieve `load_masks`. 413 414 Returns: 415 **bool:** 416 True if the masked matrices are to be loaded from a pre-existing `.npz` file specified 417 by `masks_file`, or False otherwise. 418 """ 419 return self.__load_masks 420 421 @load_masks.setter 422 def load_masks(self, x: bool) -> None: 423 """ 424 Update `load_masks`. 425 """ 426 self.__load_masks = x 427 428 @property 429 def masks_file(self) -> Union[str, pathlib.Path]: 430 """ 431 Retrieve `masks_file`. 432 433 Returns: 434 **str or pathlib.Path:** Path to the `.npz` file used for saving/loading masked matrices. 435 """ 436 return self.__masks_file 437 438 @masks_file.setter 439 def masks_file(self, x: Union[str, pathlib.Path]) -> None: 440 """ 441 Update `masks_file`. 442 """ 443 self.__masks_file = x 444 445 @property 446 def output_file(self) -> Union[str, pathlib.Path]: 447 """ 448 Retrieve `output_file`. 449 450 Returns: 451 **str or pathlib.Path:** Path to the output `.tsv` file where mdPCA results are saved. 452 """ 453 return self.__output_file 454 455 @output_file.setter 456 def output_file(self, x: Union[str, pathlib.Path]) -> None: 457 """ 458 Update `output_file`. 459 """ 460 self.__output_file = x 461 462 @property 463 def covariance_matrix_file(self) -> Optional[str]: 464 """ 465 Retrieve `covariance_matrix_file`. 466 467 Returns: 468 **str:** Path to save the covariance matrix file in `.npy` format. 469 """ 470 return self.__covariance_matrix_file 471 472 @covariance_matrix_file.setter 473 def covariance_matrix_file(self, x: Optional[str]) -> None: 474 """ 475 Update `covariance_matrix_file`. 476 """ 477 self.__covariance_matrix_file = x 478 479 @property 480 def n_components(self) -> int: 481 """ 482 Retrieve `n_components`. 483 484 Returns: 485 **int:** The number of principal components. 486 """ 487 return self.__n_components 488 489 @n_components.setter 490 def n_components(self, x: int) -> None: 491 """ 492 Update `n_components`. 493 """ 494 self.__n_components = x 495 496 @property 497 def rsid_or_chrompos(self) -> int: 498 """ 499 Retrieve `rsid_or_chrompos`. 500 501 Returns: 502 **int:** Format indicator for SNP IDs in the SNP data. Use 1 for `rsID` format or 2 for `chromosome_position`. 503 """ 504 return self.__rsid_or_chrompos 505 506 @rsid_or_chrompos.setter 507 def rsid_or_chrompos(self, x: int) -> None: 508 """ 509 Update `rsid_or_chrompos`. 510 """ 511 self.__rsid_or_chrompos = x 512 513 @property 514 def percent_vals_masked(self) -> float: 515 """ 516 Retrieve `percent_vals_masked`. 517 518 Returns: 519 **float:** 520 Percentage of values in the covariance matrix to be masked and then imputed. Only applicable if `method` is 521 `'cov_matrix_imputation'` or `'cov_matrix_imputation_ils'`. 522 """ 523 return self.__percent_vals_masked 524 525 @percent_vals_masked.setter 526 def percent_vals_masked(self, x: float) -> None: 527 """ 528 Update `percent_vals_masked`. 529 """ 530 self.__percent_vals_masked = x 531 532 @property 533 def X_new_(self) -> Optional[np.ndarray]: 534 """ 535 Retrieve `X_new_`. 536 537 Returns: 538 **array of shape (n_samples, n_components):** 539 The transformed SNP data projected onto the `n_components` principal components. 540 n_haplotypes_ is the number of haplotypes, potentially reduced if filtering is applied 541 (`min_percent_snps > 0`). For diploid individuals without filtering, the shape is 542 `(n_samples * 2, n_components)`. 543 """ 544 return self.__X_new_ 545 546 @X_new_.setter 547 def X_new_(self, x: np.ndarray) -> None: 548 """ 549 Update `X_new_`. 550 """ 551 self.__X_new_ = x 552 553 @property 554 def haplotypes_(self) -> Optional[List[str]]: 555 """ 556 Retrieve `haplotypes_`. 557 558 Returns: 559 list of str: 560 A list of unique haplotype identifiers. 561 """ 562 if isinstance(self.__haplotypes_, np.ndarray): 563 return self.__haplotypes_.ravel().tolist() # Flatten and convert NumPy array to a list 564 elif isinstance(self.__haplotypes_, list): 565 if len(self.__haplotypes_) == 1 and isinstance(self.__haplotypes_[0], np.ndarray): 566 return self.__haplotypes_[0].ravel().tolist() # Handle list containing a single array 567 return self.__haplotypes_ # Already a flat list 568 elif self.__haplotypes_ is None: 569 return None # If no haplotypes are set 570 else: 571 raise TypeError("`haplotypes_` must be a list or a NumPy array.") 572 573 @haplotypes_.setter 574 def haplotypes_(self, x: Union[np.ndarray, List[str]]) -> None: 575 """ 576 Update `haplotypes_`. 577 """ 578 if isinstance(x, np.ndarray): 579 self.__haplotypes_ = x.ravel().tolist() # Flatten and convert to a list 580 elif isinstance(x, list): 581 if len(x) == 1 and isinstance(x[0], np.ndarray): # Handle list containing a single array 582 self.__haplotypes_ = x[0].ravel().tolist() 583 else: 584 self.__haplotypes_ = x # Use directly if already a list 585 else: 586 raise TypeError("`x` must be a list or a NumPy array.") 587 588 @property 589 def samples_(self) -> Optional[List[str]]: 590 """ 591 Retrieve `samples_`. 592 593 Returns: 594 list of str: 595 A list of sample identifiers based on `haplotypes_` and `average_strands`. 596 """ 597 haplotypes = self.haplotypes_ 598 if haplotypes is None: 599 return None 600 if self.__average_strands: 601 return haplotypes 602 else: 603 return [x[:-2] for x in haplotypes] 604 605 @property 606 def variants_id_(self) -> Optional[np.ndarray]: 607 """ 608 Retrieve `variants_id_`. 609 610 Returns: 611 **array of shape (n_snp,):** 612 An array containing unique identifiers (IDs) for each SNP, 613 potentially reduced if there are SNPs not present in the `laiobj`. 614 The format will depend on `rsid_or_chrompos`. 615 """ 616 return self.__variants_id_ 617 618 @variants_id_.setter 619 def variants_id_(self, x: np.ndarray) -> None: 620 """ 621 Update `variants_id_`. 622 """ 623 self.__variants_id_ = x 624 625 @property 626 def n_haplotypes(self) -> Optional[int]: 627 """ 628 Retrieve `n_haplotypes`. 629 630 Returns: 631 **int:** 632 The total number of haplotypes, potentially reduced if filtering is applied 633 (`min_percent_snps > 0`). 634 """ 635 return len(self.haplotypes_) 636 637 @property 638 def n_samples(self) -> Optional[int]: 639 """ 640 Retrieve `n_samples`. 641 642 Returns: 643 **int:** 644 The total number of samples, potentially reduced if filtering is applied 645 (`min_percent_snps > 0`). 646 """ 647 return len(np.unique(self.samples_)) 648 649 def copy(self) -> 'mdPCA': 650 """ 651 Create and return a copy of `self`. 652 653 Returns: 654 **mdPCA:** 655 A new instance of the current object. 656 """ 657 return copy.copy(self) 658 659 @staticmethod 660 def _define_ancestry(ancestry, ancestry_map): 661 """ 662 Determine the ancestry index based on different input types. 663 664 Args: 665 ancestry (int or str): The ancestry input, which can be: 666 - An integer (e.g., 0, 1, 2). 667 - A string representation of an integer (e.g., '0', '1'). 668 - A string matching one of the ancestry map values (e.g., 'Africa'). 669 ancestry_map (dict): A dictionary mapping ancestry indices (as strings) to ancestry names. 670 671 Returns: 672 int: The corresponding ancestry index. 673 """ 674 if isinstance(ancestry, int): 675 return ancestry 676 elif isinstance(ancestry, str) and ancestry.isdigit(): 677 return int(ancestry) 678 elif ancestry in ancestry_map.values(): 679 return int(next(key for key, value in ancestry_map.items() if value == ancestry)) 680 else: 681 raise ValueError(f"Invalid ancestry input: {ancestry}") 682 683 def _load_mask_file(self): 684 """ 685 Load previously saved masked genotype data from an `.npz` file. 686 """ 687 mask_files = np.load(self.masks_file, allow_pickle=True) 688 mask = mask_files['mask'] 689 rs_ID_list = mask_files['rs_ID_list'] 690 ind_ID_list = mask_files['ind_ID_list'] 691 labels = mask_files['labels'] 692 weights = mask_files['weights'] 693 return mask, rs_ID_list, ind_ID_list, labels, weights 694 695 @staticmethod 696 def _compute_strength_vector(X): 697 strength_vector = np.sum(~np.isnan(X), axis=1) / X.shape[1] 698 return strength_vector 699 700 @staticmethod 701 def _compute_strength_matrix(X): 702 notmask = (~np.isnan(X)).astype(np.float32) 703 strength_matrix = np.dot(notmask, notmask.T) 704 strength_matrix /= X.shape[1] 705 return strength_matrix 706 707 def _cov(self, x): 708 ddof = 1 709 x = np.ma.array(x, ndmin=2, copy=True, dtype=np.float32) 710 xmask = np.ma.getmaskarray(x) 711 xnotmask = np.logical_not(xmask).astype(np.float32) 712 fact = np.dot(xnotmask, xnotmask.T) * 1. - ddof 713 del xnotmask 714 gc.collect() 715 result = (np.ma.dot(x, x.T, strict=False) / fact).squeeze() 716 x = x.data 717 strength_vec = self._compute_strength_vector(x) 718 strength_mat = self._compute_strength_matrix(x) 719 return result.data, strength_vec, strength_mat 720 721 @staticmethod 722 def _demean(S, w): 723 w_sum = np.matmul(w, S) 724 w_rowsum = w_sum.reshape((1, S.shape[0])) 725 w_colsum = w_sum.reshape((S.shape[0], 1)) 726 w_totalsum = np.dot(w_sum, w) 727 S -= (w_rowsum + w_colsum) - w_totalsum 728 return S 729 730 @staticmethod 731 def _iterative_svd_matrix_completion(cov): 732 num_masked = np.isnan(cov).sum() 733 if num_masked > 0: 734 start_time = time.time() 735 start_rank = 1 736 end_rank = 10 737 rank = 5 738 choose_best = True 739 num_cores = 5 740 cov_complete = IterativeSVD(start_rank=start_rank, end_rank=end_rank, rank=rank, choose_best=choose_best, num_cores=num_cores).fit_transform(cov) 741 logging.info("Iterative SVD --- %s seconds ---" % (time.time() - start_time)) 742 else: 743 cov_complete = cov 744 return cov_complete 745 746 def _weighted_cov_pca(self, X_incomplete, weights, covariance_matrix_file, n_components): 747 start_time = time.time() 748 X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete)) 749 S, _, _ = self._cov(X_incomplete) 750 logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time)) 751 weights_normalized = weights / weights.sum() 752 S = self._demean(S, weights_normalized) 753 W = np.diag(weights) 754 WSW = np.matmul(np.matmul(np.sqrt(W), S), np.sqrt(W)) 755 if covariance_matrix_file: 756 np.save(covariance_matrix_file, S.data) 757 svd = TruncatedSVD(n_components, algorithm="arpack") 758 svd.fit(WSW) 759 U = np.matmul(svd.components_, np.linalg.inv(np.sqrt(W))).T 760 D = np.diag(np.sqrt(svd.singular_values_)) 761 T = np.matmul(U, D) 762 total_var = np.trace(S) 763 for i in range(n_components): 764 pc_percentvar = 100 * svd.singular_values_[i] / total_var 765 logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar) 766 return T 767 768 @staticmethod 769 def _create_validation_mask(X_incomplete, percent_inds): 770 np.random.seed(1) 771 masked_rows = np.isnan(X_incomplete).any(axis=1) 772 masked_inds = np.flatnonzero(masked_rows) 773 X_masked = X_incomplete[masked_rows] 774 percent_masked = 100 * np.isnan(X_masked).sum() / (X_masked.shape[0] * X_masked.shape[1]) 775 unmasked_rows = ~masked_rows 776 X_unmasked = X_incomplete[unmasked_rows] 777 masked_rows = np.random.choice(range(X_unmasked.shape[0]), size=int(X_unmasked.shape[0] * percent_inds / 100), replace=False) 778 X_masked_rows = X_unmasked[masked_rows,:] 779 mask = np.zeros(X_masked_rows.shape[0] * X_masked_rows.shape[1], dtype=np.int8) 780 mask[:int(X_masked_rows.shape[0] * X_masked_rows.shape[1] * percent_masked / 100)] = 1 781 np.random.shuffle(mask) 782 mask = mask.astype(bool) 783 mask = mask.reshape(X_masked_rows.shape) 784 X_masked_rows[mask] = np.nan 785 X_unmasked[masked_rows] = X_masked_rows 786 X_incomplete[unmasked_rows] = X_unmasked 787 masked_rows_new = np.isnan(X_incomplete).any(axis=1) 788 masked_inds_new = np.flatnonzero(masked_rows_new) 789 masked_inds_val = sorted(list(set(masked_inds_new) - set(masked_inds))) 790 return X_incomplete, masked_inds_val 791 792 @staticmethod 793 def _create_cov_mask(cov, strength_mat, percent_vals_masked): 794 cov_flattened = cov.reshape(-1).copy() 795 num_vals_masked = int(percent_vals_masked * len(cov_flattened) / 100) 796 pos_masked = strength_mat.reshape(-1).argsort()[:num_vals_masked] 797 cov_flattened[pos_masked] = np.nan 798 cov_masked = cov_flattened.reshape(cov.shape) 799 return cov_masked 800 801 def _regularized_optimization_ils(self, X_incomplete, covariance_matrix_file, n_components): 802 def run_cov_matrix_regularized_optimization_ils(X_incomplete, covariance_matrix_file): 803 start_time = time.time() 804 X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete)) 805 has_missing = np.isnan(X_incomplete.data).sum() > 0 806 S, _, strength_mat = self._cov(X_incomplete) 807 logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time)) 808 S = self._demean(S, np.ones(S.shape[0]) / S.shape[0]) 809 if has_missing: 810 logging.info("Starting matrix completion. This will take a few minutes...") 811 start_time = time.time() 812 percent_inds_val = 20 # Percent of unmasked individuals to be masked for cross-validation 813 X_incomplete, masked_inds_val = self._create_validation_mask(X_incomplete.data, percent_inds_val) # masked_inds_val is the list of indices of the individuals masked for validation 814 X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete)) 815 S_prime, _, w_mat_prime = self._cov(X_incomplete) 816 del X_incomplete 817 gc.collect() 818 S_prime = self._demean(S_prime, np.ones(S_prime.shape[0]) / S_prime.shape[0]) 819 S_robust, lam = self.matrix_completion(S, strength_mat, S_prime, w_mat_prime, lams=None, method="NN", 820 cv_inds=masked_inds_val) 821 logging.info(f"Covariance Matrix --- %{time.time() - start_time:.2}s seconds ---") 822 else: 823 S_robust = S.copy() 824 if covariance_matrix_file: 825 np.save(covariance_matrix_file, S.data) 826 if has_missing: 827 base, ext = os.path.splitext(covariance_matrix_file) 828 np.save(f"{base}_completed_{lam}{ext}", S_robust.data) 829 return S, S_robust 830 831 def _compute_projection_regularized_optimization_ils(X_incomplete, S, S_robust, n_components): 832 _, _, strength_mat = self._cov(np.ma.array(X_incomplete, mask=np.isnan(X_incomplete))) 833 nonmissing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) == 0) 834 missing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) > 0) 835 missing_sorted = sorted(missing, key=lambda x: (~np.isnan(X_incomplete[x])).sum(), reverse=True) 836 svd = TruncatedSVD(n_components, algorithm="arpack") 837 svd.fit(S_robust) 838 U = svd.components_.T 839 U_nonmissing = U[nonmissing] 840 D_nonmissing = np.diag(np.sqrt(svd.singular_values_)) 841 T = np.zeros((X_incomplete.shape[0], n_components)) 842 T_filled = np.matmul(U_nonmissing, D_nonmissing) 843 i_filled = nonmissing 844 T[nonmissing] = T_filled 845 for i in missing_sorted: 846 S_i = S[i][i_filled] 847 W_i = np.diag(strength_mat[i][i_filled]) 848 A = np.matmul(W_i, T_filled) 849 b = np.matmul(W_i, S_i) 850 t = np.linalg.lstsq(A, b, rcond=None)[0] 851 T[i] = t 852 i_filled = np.append(i_filled, i) 853 T_filled = np.append(T_filled, [t], axis=0) 854 total_var = np.trace(S) 855 for i in range(n_components): 856 pc_percentvar = 100 * svd.singular_values_[i] / total_var 857 logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar) 858 return T 859 860 S, S_robust = run_cov_matrix_regularized_optimization_ils(X_incomplete, covariance_matrix_file) 861 T = _compute_projection_regularized_optimization_ils(X_incomplete, S, S_robust, n_components) 862 return T 863 864 def _cov_matrix_imputation(self, X_incomplete, percent_vals_masked, covariance_matrix_file, n_components): 865 start_time = time.time() 866 X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete)) 867 S, strength_vec, strength_mat = self._cov(X_incomplete) 868 logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time)) 869 S_masked = self._create_cov_mask(S, strength_mat, percent_vals_masked) 870 S = self._iterative_svd_matrix_completion(S_masked) 871 weights = np.ones(len(strength_vec)) 872 weights_normalized = weights / weights.sum() 873 S = self._demean(S, weights_normalized) 874 W = np.diag(weights) 875 WSW = np.matmul(np.matmul(np.sqrt(W), S), np.sqrt(W)) 876 if covariance_matrix_file: 877 np.save(covariance_matrix_file, S.data) 878 svd = TruncatedSVD(n_components, algorithm="arpack") 879 svd.fit(WSW) 880 U = np.matmul(svd.components_, np.linalg.inv(np.sqrt(W))).T 881 D = np.diag(np.sqrt(svd.singular_values_)) 882 T = np.matmul(U, D) 883 total_var = np.trace(S) 884 for i in range(n_components): 885 pc_percentvar = 100 * svd.singular_values_[i] / total_var 886 logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar) 887 return T 888 889 def _cov_matrix_imputation_ils(self, X_incomplete, percent_vals_masked, covariance_matrix_file, n_components): 890 891 def run_cov_matrix_cov_matrix_imputation_ils(X_incomplete, covariance_matrix_file): 892 start_time = time.time() 893 X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete)) 894 S, strength_vec, strength_mat = self._cov(X_incomplete) 895 logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time)) 896 S_masked = self._create_cov_mask(S, strength_mat, percent_vals_masked) 897 S = self._iterative_svd_matrix_completion(S_masked) 898 weights = np.ones(len(strength_vec)) 899 weights_normalized = weights / weights.sum() 900 S = self._demean(S, weights_normalized) 901 W = np.diag(weights) 902 if covariance_matrix_file: 903 np.save(covariance_matrix_file, S.data) 904 return S, W 905 906 def compute_projection_cov_matrix_imputation_ils(X_incomplete, S, W, n_components): 907 S, _, strength_mat = self._cov(np.ma.array(X_incomplete, mask=np.isnan(X_incomplete))) 908 S = self._demean(S, np.ones(S.shape[0]) / S.shape[0]) 909 nonmissing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) == 0) 910 missing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) > 0) 911 missing_sorted = sorted(missing, key=lambda x: (~np.isnan(X_incomplete[x])).sum(), reverse=True) 912 svd = TruncatedSVD(n_components, algorithm="arpack") 913 WSW = np.matmul(np.matmul(np.sqrt(W), S), np.sqrt(W)) 914 svd.fit(WSW) 915 U = np.matmul(svd.components_, np.linalg.inv(np.sqrt(W))).T 916 U_nonmissing = U[nonmissing] 917 D_nonmissing = np.diag(np.sqrt(svd.singular_values_)) 918 T = np.zeros((X_incomplete.shape[0], n_components)) 919 T_filled = np.matmul(U_nonmissing, D_nonmissing) 920 i_filled = nonmissing 921 T[nonmissing] = T_filled 922 for i in missing_sorted: 923 S_i = S[i][i_filled] 924 W_i = np.diag(strength_mat[i][i_filled]) 925 A = np.matmul(W_i, T_filled) 926 b = np.matmul(W_i, S_i) 927 t = np.linalg.lstsq(A, b, rcond=None)[0] 928 T[i] = t 929 i_filled = np.append(i_filled, i) 930 T_filled = np.append(T_filled, [t], axis=0) 931 total_var = np.trace(S) 932 for i in range(n_components): 933 pc_percentvar = 100 * svd.singular_values_[i] / total_var 934 logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar) 935 return T 936 937 S, W = run_cov_matrix_cov_matrix_imputation_ils(X_incomplete, covariance_matrix_file) 938 T = compute_projection_cov_matrix_imputation_ils(X_incomplete, S, W, n_components) 939 return T 940 941 def _nonmissing_pca_ils(self, X_incomplete, covariance_matrix_file, n_components): 942 943 def run_cov_matrix_nonmissing_pca_ils(X_incomplete, covariance_matrix_file): 944 start_time = time.time() 945 nonmissing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) == 0) 946 X_incomplete_nonmissing = X_incomplete[nonmissing] 947 X_incomplete_nonmissing = np.ma.array(X_incomplete_nonmissing, mask=np.isnan(X_incomplete_nonmissing)) 948 S_nonmissing, _, _ = self._cov(X_incomplete_nonmissing) 949 logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time)) 950 S_nonmissing = self._demean(S_nonmissing, np.ones(S_nonmissing.shape[0]) / S_nonmissing.shape[0]) 951 if covariance_matrix_file: 952 np.save(covariance_matrix_file, S_nonmissing.data) 953 return S_nonmissing 954 955 def compute_projection_nonmissing_pca_ils(X_incomplete, S_nonmissing, n_components): 956 S, _, strength_mat = self._cov(np.ma.array(X_incomplete, mask=np.isnan(X_incomplete))) 957 S = self._demean(S, np.ones(S.shape[0]) / S.shape[0]) 958 nonmissing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) == 0) 959 missing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) > 0) 960 missing_sorted = sorted(missing, key=lambda x: (~np.isnan(X_incomplete[x])).sum(), reverse=True) 961 svd = TruncatedSVD(n_components, algorithm="arpack") 962 svd.fit(S_nonmissing) 963 U_nonmissing = svd.components_.T 964 D_nonmissing = np.diag(np.sqrt(svd.singular_values_)) 965 T = np.zeros((X_incomplete.shape[0], n_components)) 966 T_filled = np.matmul(U_nonmissing, D_nonmissing) 967 i_filled = nonmissing 968 T[nonmissing] = T_filled 969 for i in missing_sorted: 970 S_i = S[i][i_filled] 971 W_i = np.diag(strength_mat[i][i_filled]) 972 A = np.matmul(W_i, T_filled) 973 b = np.matmul(W_i, S_i) 974 t = np.linalg.lstsq(A, b, rcond=None)[0] 975 T[i] = t 976 i_filled = np.append(i_filled, i) 977 T_filled = np.append(T_filled, [t], axis=0) 978 total_var = np.trace(S) 979 for i in range(n_components): 980 pc_percentvar = 100 * svd.singular_values_[i] / total_var 981 logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar) 982 return T 983 984 S_nonmissing = run_cov_matrix_nonmissing_pca_ils(X_incomplete, covariance_matrix_file) 985 T = compute_projection_nonmissing_pca_ils(X_incomplete, S_nonmissing, n_components) 986 return T 987 988 def _run_cov_matrix(self, X_incomplete, weights): 989 """ 990 Runs the specified PCA method on the incomplete data. 991 """ 992 if self.method == "weighted_cov_pca": 993 return self._weighted_cov_pca(X_incomplete, weights, self.covariance_matrix_file, self.n_components) 994 elif self.method == "regularized_optimization_ils": 995 return self._regularized_optimization_ils(X_incomplete, self.covariance_matrix_file, self.n_components) 996 elif self.method == "cov_matrix_imputation": 997 return self._cov_matrix_imputation(X_incomplete, self.percent_vals_masked, self.covariance_matrix_file, self.n_components) 998 elif self.method == "cov_matrix_imputation_ils": 999 return self._cov_matrix_imputation_ils(X_incomplete, self.percent_vals_masked, self.covariance_matrix_file, self.n_components) 1000 elif self.method == "nonmissing_pca_ils": 1001 return self._nonmissing_pca_ils(X_incomplete, self.covariance_matrix_file, self.n_components) 1002 else: 1003 raise ValueError(f"Unsupported method: {self.method}") 1004 1005 def fit_transform( 1006 self, 1007 snpobj: Optional['SNPObject'] = None, 1008 laiobj: Optional['LocalAncestryObject'] = None, 1009 labels_file: Optional[str] = None, 1010 ancestry: Optional[str] = None, 1011 average_strands: Optional[bool] = None 1012 ) -> np.ndarray: 1013 """ 1014 Fit the model to the SNP data stored in the provided `snpobj` and apply the dimensionality reduction on the same SNP data. 1015 1016 This method starts by loading or updating SNP and ancestry data. Then, it manages missing values by applying 1017 masks based on ancestry, either by loading a pre-existing mask or generating new ones. After processing these 1018 masks to produce an incomplete SNP data matrix, it applies the chosen mdPCA method to reduce dimensionality 1019 while handling missing data as specified. 1020 1021 Args: 1022 snpobj (SNPObject, optional): 1023 A SNPObject instance. 1024 laiobj (LAIObject, optional): 1025 A LAIObject instance. 1026 labels_file (str, optional): 1027 Path to the labels file in .tsv format. The first column, `indID`, contains the individual identifiers, and the second 1028 column, `label`, specifies the groups for all individuals. If `is_weighted=True`, a `weight` column with individual 1029 weights is required. Optionally, `combination` and `combination_weight` columns can specify sets of individuals to be 1030 combined into groups, with respective weights. 1031 ancestry (str, optional): 1032 Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0. 1033 average_strands (bool, optional): 1034 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 1035 If None, defaults to `self.average_strands`. 1036 1037 Returns: 1038 **array of shape (n_samples, n_components):** 1039 The transformed SNP data projected onto the `n_components` principal components, stored in `self.X_new_`. 1040 """ 1041 if snpobj is None: 1042 snpobj = self.snpobj 1043 if laiobj is None: 1044 laiobj = self.laiobj 1045 if labels_file is None: 1046 labels_file = self.labels_file 1047 if ancestry is None: 1048 ancestry = self.ancestry 1049 if average_strands is None: 1050 average_strands = self.average_strands 1051 1052 if self.load_masks: 1053 # Load precomputed ancestry-based masked genotype matrix, SNP identifiers, haplotype identifiers, and weights 1054 mask, variants_id, haplotypes, _, weights = self._load_mask_file() 1055 else: 1056 # Process genotype data with optional ancestry-based masking and return the corresponding SNP and individual identifiers 1057 mask, variants_id, haplotypes = process_calldata_gt( 1058 self.snpobj, 1059 self.laiobj, 1060 self.ancestry, 1061 self.average_strands, 1062 self.force_nan_incomplete_strands, 1063 self.is_masked, 1064 self.rsid_or_chrompos 1065 ) 1066 1067 # Process individual genomic labels and weights, aligning them with a masked genotype matrix by 1068 # filtering out low-coverage individuals, reordering data to match the matrix structure, and 1069 # handling group-based adjustments 1070 mask, haplotypes, _, weights = process_labels_weights( 1071 self.labels_file, 1072 mask, 1073 variants_id, 1074 haplotypes, 1075 self.average_strands, 1076 self.ancestry, 1077 self.min_percent_snps, 1078 self.group_snp_frequencies_only, 1079 self.groups_to_remove, 1080 self.is_weighted, 1081 self.save_masks, 1082 self.masks_file 1083 ) 1084 1085 # Call run_cov_matrix with the specified method 1086 self.X_new_ = self._run_cov_matrix( 1087 mask[self.ancestry].T, 1088 weights 1089 ) 1090 1091 self.haplotypes_ = haplotypes 1092 self.variants_id_ = variants_id 1093 1094 return self.X_new_
19class mdPCA: 20 """ 21 A class for performing missing data principal component analysis (mdPCA) on SNP data. 22 23 The mdPCA class focuses on genotype segments from the ancestry of interest when the `is_masked` flag is set to `True`. It offers 24 flexible processing options, allowing either separate handling of masked haplotype strands or combining (averaging) strands into a 25 single composite representation for each individual. Moreover, the analysis can be performed on individual-level data, group-level SNP 26 frequencies, or a combination of both. 27 28 If the `snpobj`, `laiobj`, `labels_file`, and `ancestry` parameters are all provided during instantiation, the `fit_transform` method 29 will be automatically called, applying the specified mdPCA method to transform the data upon instantiation. 30 """ 31 def __init__( 32 self, 33 method: str = 'weighted_cov_pca', 34 snpobj: Optional['SNPObject'] = None, 35 laiobj: Optional['LocalAncestryObject'] = None, 36 labels_file: Optional[str] = None, 37 ancestry: Optional[Union[int, str]] = None, 38 is_masked: bool = True, 39 average_strands: bool = False, 40 force_nan_incomplete_strands: bool = False, 41 is_weighted: bool = False, 42 groups_to_remove: List[str] = None, 43 min_percent_snps: float = 4, 44 group_snp_frequencies_only: bool = True, 45 save_masks: bool = False, 46 load_masks: bool = False, 47 masks_file: Union[str, pathlib.Path] = 'masks.npz', 48 output_file: Union[str, pathlib.Path] = 'output.tsv', 49 covariance_matrix_file: Optional[str] = None, 50 n_components: int = 2, 51 rsid_or_chrompos: int = 2, 52 percent_vals_masked: float = 0 53 ): 54 """ 55 Args: 56 method (str, default='weighted_cov_pca'): 57 The PCA method to use for dimensionality reduction. Options include: 58 - `'weighted_cov_pca'`: 59 Simple covariance-based PCA, weighted by sample strengths. 60 - `'regularized_optimization_ils'`: 61 Regularized optimization followed by iterative, weighted (via the strengths) least squares projection of 62 missing samples using the original covariance matrix (considering only relevant elements not missing in 63 the original covariance matrix for those samples). 64 - `'cov_matrix_imputation'`: 65 Eigen-decomposition of the covariance matrix after first imputing the covariance matrix missing values 66 using the Iterative SVD imputation method. 67 - `'cov_matrix_imputation_ils'`: 68 The method of 'cov_matrix_imputation', but where afterwards missing samples are re-projected onto the space 69 given by 'cov_matrix_imputation' using the same iterative method on the original covariance matrix just 70 as done in 'regularized_optimization_ils'. 71 - `'nonmissing_pca_ils'`: 72 The method of 'weighted_cov_pca' on the non-missing samples, followed by the projection of missing samples onto 73 the space given by 'weighted_cov_pca' using the same iterative method on the original covariance matrix just as 74 done in 'regularized_optimization_ils'. 75 snpobj (SNPObject, optional): 76 A SNPObject instance. 77 laiobj (LAIObject, optional): 78 A LAIObject instance. 79 labels_file (str, optional): 80 Path to a `.tsv` file with metadata on individuals, including population labels, optional weights, and groupings. 81 The `indID` column must contain unique individual identifiers matching those in `laiobj` and `snpobj` for proper alignment. 82 The `label` column assigns population groups. If `is_weighted=True`, a `weight` column must be provided, assigning a weight to 83 each individual, where those with a weight of zero are removed. Optional columns include `combination` and `combination_weight` 84 to aggregate individuals into combined groups, where SNP frequencies represent their sequences. The `combination` column assigns 85 each individual to a specific group (0 for no combination, 1 for the first group, 2 for the second, etc.). All members of a group 86 must share the same `label` and `combination_weight`. If `combination_weight` column is not provided, the combinations are 87 assigned a default weight of `1`. Individuals excluded via `groups_to_remove` or those falling below `min_percent_snps` are removed 88 from the analysis. 89 ancestry (int or str, optional): 90 Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at `0`. The ancestry input can be: 91 - An integer (e.g., 0, 1, 2). 92 - A string representation of an integer (e.g., '0', '1'). 93 - A string matching one of the ancestry map values (e.g., 'Africa'). 94 is_masked (bool, default=True): 95 If `True`, applies ancestry-specific masking to the genotype matrix, retaining only genotype data 96 corresponding to the specified `ancestry`. If `False`, uses the full, unmasked genotype matrix. 97 average_strands (bool, default=False): 98 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 99 force_nan_incomplete_strands (bool): 100 If `True`, sets the result to NaN if either haplotype in a pair is NaN. 101 Otherwise, computes the mean while ignoring NaNs (e.g., 0|NaN -> 0, 1|NaN -> 1). 102 is_weighted (bool, default=False): 103 If `True`, assigns individual weights from the `weight` column in `labels_file`. Otherwise, all individuals have equal weight of `1`. 104 groups_to_remove (list of str, optional): 105 List with groups to exclude from analysis. Example: ['group1', 'group2']. 106 min_percent_snps (float, default=4): 107 Minimum percentage of SNPs that must be known for an individual and of the ancestry of interest to be included in the analysis. 108 All individuals with fewer percent of unmasked SNPs than this threshold will be excluded. 109 group_snp_frequencies_only (bool, default=True): 110 If True, mdPCA is performed exclusively on group-level SNP frequencies, ignoring individual-level data. This applies when `is_weighted` is 111 set to True and a `combination` column is provided in the `labels_file`, meaning individuals are aggregated into groups based on their assigned 112 labels. If False, mdPCA is performed on individual-level SNP data alone or on both individual-level and group-level SNP frequencies when 113 `is_weighted` is True and a `combination` column is provided. 114 save_masks (bool, default=False): 115 True if the masked matrices are to be saved in a `.npz` file, or False otherwise. 116 load_masks (bool, default=False): 117 True if the masked matrices are to be loaded from a pre-existing `.npz` file specified by `masks_file`, or False otherwise. 118 masks_file (str or pathlib.Path, default='masks.npz'): 119 Path to the `.npz` file used for saving/loading masked matrices. 120 output_file (str or pathlib.Path, default='output.tsv'): 121 Path to the output `.tsv` file where mdPCA results are saved. 122 covariance_matrix_file (str, optional): 123 Path to save the covariance matrix file in `.npy` format. If None, the covariance matrix is not saved. Default is None. 124 n_components (int, default=2): 125 The number of principal components. 126 rsid_or_chrompos (int, default=2): 127 Format indicator for SNP IDs in `self.__X_new_`. Use 1 for `rsID` format or 2 for `chromosome_position`. 128 percent_vals_masked (float, default=0): 129 Percentage of values in the covariance matrix to be masked and then imputed. Only applicable if `method` is 130 `'cov_matrix_imputation'` or `'cov_matrix_imputation_ils'`. 131 """ 132 self.__snpobj = snpobj 133 self.__laiobj = laiobj 134 self.__labels_file = labels_file 135 self.__ancestry = self._define_ancestry(ancestry, laiobj.ancestry_map) 136 self.__method = method 137 self.__is_masked = is_masked 138 self.__average_strands = average_strands 139 self.__force_nan_incomplete_strands = force_nan_incomplete_strands 140 self.__is_weighted = is_weighted 141 self.__groups_to_remove = groups_to_remove 142 self.__min_percent_snps = min_percent_snps 143 self.__group_snp_frequencies_only = group_snp_frequencies_only 144 self.__save_masks = save_masks 145 self.__load_masks = load_masks 146 self.__masks_file = masks_file 147 self.__output_file = output_file 148 self.__covariance_matrix_file = covariance_matrix_file 149 self.__n_components = n_components 150 self.__rsid_or_chrompos = rsid_or_chrompos 151 self.__percent_vals_masked = percent_vals_masked 152 self.__X_new_ = None # Store transformed SNP data 153 self.__haplotypes_ = None # Store haplotypes of X_new_ (after filtering if min_percent_snps > 0) 154 self.__samples_ = None # Store samples of X_new_ (after filtering if min_percent_snps > 0) 155 self.__variants_id_ = None # Store variants ID (after filtering SNPs not in laiobj) 156 157 # Fit and transform if a `snpobj`, `laiobj`, `labels_file`, and `ancestry` are provided 158 if self.snpobj is not None and self.laiobj is not None and self.labels_file is not None and self.ancestry is not None: 159 self.fit_transform(snpobj, laiobj, labels_file, ancestry) 160 161 def __getitem__(self, key): 162 """ 163 To access an attribute of the class using the square bracket notation, 164 similar to a dictionary. 165 """ 166 try: 167 return getattr(self, key) 168 except AttributeError: 169 raise KeyError(f'Invalid key: {key}') 170 171 def __setitem__(self, key, value): 172 """ 173 To set an attribute of the class using the square bracket notation, 174 similar to a dictionary. 175 """ 176 try: 177 setattr(self, key, value) 178 except AttributeError: 179 raise KeyError(f'Invalid key: {key}') 180 181 @property 182 def method(self) -> str: 183 """ 184 Retrieve `method`. 185 186 Returns: 187 **str:** The PCA method to use for dimensionality reduction. 188 """ 189 return self.__method 190 191 @method.setter 192 def method(self, x: str) -> None: 193 """ 194 Update `method`. 195 """ 196 self.__method = x 197 198 @property 199 def snpobj(self) -> Optional['SNPObject']: 200 """ 201 Retrieve `snpobj`. 202 203 Returns: 204 **SNPObject:** A SNPObject instance. 205 """ 206 return self.__snpobj 207 208 @snpobj.setter 209 def snpobj(self, x: 'SNPObject') -> None: 210 """ 211 Update `snpobj`. 212 """ 213 self.__snpobj = x 214 215 @property 216 def laiobj(self) -> Optional['LocalAncestryObject']: 217 """ 218 Retrieve `laiobj`. 219 220 Returns: 221 **LocalAncestryObject:** A LocalAncestryObject instance. 222 """ 223 return self.__laiobj 224 225 @laiobj.setter 226 def laiobj(self, x: 'LocalAncestryObject') -> None: 227 """ 228 Update `laiobj`. 229 """ 230 self.__laiobj = x 231 232 @property 233 def labels_file(self) -> Optional[str]: 234 """ 235 Retrieve `labels_file`. 236 237 Returns: 238 **str:** Path to the labels file in `.tsv` format. 239 """ 240 return self.__labels_file 241 242 @labels_file.setter 243 def labels_file(self, x: str) -> None: 244 """ 245 Update `labels_file`. 246 """ 247 self.__labels_file = x 248 249 @property 250 def ancestry(self) -> Optional[str]: 251 """ 252 Retrieve `ancestry`. 253 254 Returns: 255 **str:** Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0. 256 """ 257 return self.__ancestry 258 259 @ancestry.setter 260 def ancestry(self, x: str) -> None: 261 """ 262 Update `ancestry`. 263 """ 264 self.__ancestry = x 265 266 @property 267 def is_masked(self) -> bool: 268 """ 269 Retrieve `is_masked`. 270 271 Returns: 272 **bool:** True if an ancestry file is passed for ancestry-specific masking, or False otherwise. 273 """ 274 return self.__is_masked 275 276 @is_masked.setter 277 def is_masked(self, x: bool) -> None: 278 """ 279 Update `is_masked`. 280 """ 281 self.__is_masked = x 282 283 @property 284 def average_strands(self) -> bool: 285 """ 286 Retrieve `average_strands`. 287 288 Returns: 289 **bool:** True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 290 """ 291 return self.__average_strands 292 293 @average_strands.setter 294 def average_strands(self, x: bool) -> None: 295 """ 296 Update `average_strands`. 297 """ 298 self.__average_strands = x 299 300 @property 301 def force_nan_incomplete_strands(self) -> bool: 302 """ 303 Retrieve `force_nan_incomplete_strands`. 304 305 Returns: 306 **bool**: If `True`, sets the result to NaN if either haplotype in a pair is NaN. 307 Otherwise, computes the mean while ignoring NaNs (e.g., 0|NaN -> 0, 1|NaN -> 1). 308 """ 309 return self.__force_nan_incomplete_strands 310 311 @force_nan_incomplete_strands.setter 312 def force_nan_incomplete_strands(self, x: bool) -> None: 313 """ 314 Update `force_nan_incomplete_strands`. 315 """ 316 self.__force_nan_incomplete_strands = x 317 318 @property 319 def is_weighted(self) -> bool: 320 """ 321 Retrieve `is_weighted`. 322 323 Returns: 324 **bool:** True if weights are provided in the labels file, or False otherwise. 325 """ 326 return self.__is_weighted 327 328 @is_weighted.setter 329 def is_weighted(self, x: bool) -> None: 330 """ 331 Update `is_weighted`. 332 """ 333 self.__is_weighted = x 334 335 @property 336 def groups_to_remove(self) -> Dict[int, List[str]]: 337 """ 338 Retrieve `groups_to_remove`. 339 340 Returns: 341 **dict of int to list of str:** Dictionary specifying groups to exclude from analysis. Keys are array numbers, and values are 342 lists of groups to remove for each array. Example: `{1: ['group1', 'group2'], 2: [], 3: ['group3']}`. 343 """ 344 return self.__groups_to_remove 345 346 @groups_to_remove.setter 347 def groups_to_remove(self, x: Dict[int, List[str]]) -> None: 348 """ 349 Update `groups_to_remove`. 350 """ 351 self.__groups_to_remove = x 352 353 @property 354 def min_percent_snps(self) -> float: 355 """ 356 Retrieve `min_percent_snps`. 357 358 Returns: 359 **float:** 360 Minimum percentage of SNPs that must be known for an individual to be included in the analysis. 361 All individuals with fewer percent of unmasked SNPs than this threshold will be excluded. 362 """ 363 return self.__min_percent_snps 364 365 @min_percent_snps.setter 366 def min_percent_snps(self, x: float) -> None: 367 """ 368 Update `min_percent_snps`. 369 """ 370 self.__min_percent_snps = x 371 372 @property 373 def group_snp_frequencies_only(self) -> bool: 374 """ 375 Retrieve `group_snp_frequencies_only`. 376 377 Returns: 378 **bool:** 379 If True, mdPCA is performed exclusively on group-level SNP frequencies, ignoring individual-level data. This applies 380 when `is_weighted` is set to True and a `combination` column is provided in the `labels_file`, meaning individuals are 381 aggregated into groups based on their assigned labels. If False, mdPCA is performed on individual-level SNP data alone 382 or on both individual-level and group-level SNP frequencies when `is_weighted` is True and a `combination` column is provided. 383 """ 384 return self.__group_snp_frequencies_only 385 386 @group_snp_frequencies_only.setter 387 def group_snp_frequencies_only(self, x: bool) -> None: 388 """ 389 Update `group_snp_frequencies_only`. 390 """ 391 self.__group_snp_frequencies_only = x 392 393 @property 394 def save_masks(self) -> bool: 395 """ 396 Retrieve `save_masks`. 397 398 Returns: 399 **bool:** True if the masked matrices are to be saved in a `.npz` file, or False otherwise. 400 """ 401 return self.__save_masks 402 403 @save_masks.setter 404 def save_masks(self, x: bool) -> None: 405 """ 406 Update `save_masks`. 407 """ 408 self.__save_masks = x 409 410 @property 411 def load_masks(self) -> bool: 412 """ 413 Retrieve `load_masks`. 414 415 Returns: 416 **bool:** 417 True if the masked matrices are to be loaded from a pre-existing `.npz` file specified 418 by `masks_file`, or False otherwise. 419 """ 420 return self.__load_masks 421 422 @load_masks.setter 423 def load_masks(self, x: bool) -> None: 424 """ 425 Update `load_masks`. 426 """ 427 self.__load_masks = x 428 429 @property 430 def masks_file(self) -> Union[str, pathlib.Path]: 431 """ 432 Retrieve `masks_file`. 433 434 Returns: 435 **str or pathlib.Path:** Path to the `.npz` file used for saving/loading masked matrices. 436 """ 437 return self.__masks_file 438 439 @masks_file.setter 440 def masks_file(self, x: Union[str, pathlib.Path]) -> None: 441 """ 442 Update `masks_file`. 443 """ 444 self.__masks_file = x 445 446 @property 447 def output_file(self) -> Union[str, pathlib.Path]: 448 """ 449 Retrieve `output_file`. 450 451 Returns: 452 **str or pathlib.Path:** Path to the output `.tsv` file where mdPCA results are saved. 453 """ 454 return self.__output_file 455 456 @output_file.setter 457 def output_file(self, x: Union[str, pathlib.Path]) -> None: 458 """ 459 Update `output_file`. 460 """ 461 self.__output_file = x 462 463 @property 464 def covariance_matrix_file(self) -> Optional[str]: 465 """ 466 Retrieve `covariance_matrix_file`. 467 468 Returns: 469 **str:** Path to save the covariance matrix file in `.npy` format. 470 """ 471 return self.__covariance_matrix_file 472 473 @covariance_matrix_file.setter 474 def covariance_matrix_file(self, x: Optional[str]) -> None: 475 """ 476 Update `covariance_matrix_file`. 477 """ 478 self.__covariance_matrix_file = x 479 480 @property 481 def n_components(self) -> int: 482 """ 483 Retrieve `n_components`. 484 485 Returns: 486 **int:** The number of principal components. 487 """ 488 return self.__n_components 489 490 @n_components.setter 491 def n_components(self, x: int) -> None: 492 """ 493 Update `n_components`. 494 """ 495 self.__n_components = x 496 497 @property 498 def rsid_or_chrompos(self) -> int: 499 """ 500 Retrieve `rsid_or_chrompos`. 501 502 Returns: 503 **int:** Format indicator for SNP IDs in the SNP data. Use 1 for `rsID` format or 2 for `chromosome_position`. 504 """ 505 return self.__rsid_or_chrompos 506 507 @rsid_or_chrompos.setter 508 def rsid_or_chrompos(self, x: int) -> None: 509 """ 510 Update `rsid_or_chrompos`. 511 """ 512 self.__rsid_or_chrompos = x 513 514 @property 515 def percent_vals_masked(self) -> float: 516 """ 517 Retrieve `percent_vals_masked`. 518 519 Returns: 520 **float:** 521 Percentage of values in the covariance matrix to be masked and then imputed. Only applicable if `method` is 522 `'cov_matrix_imputation'` or `'cov_matrix_imputation_ils'`. 523 """ 524 return self.__percent_vals_masked 525 526 @percent_vals_masked.setter 527 def percent_vals_masked(self, x: float) -> None: 528 """ 529 Update `percent_vals_masked`. 530 """ 531 self.__percent_vals_masked = x 532 533 @property 534 def X_new_(self) -> Optional[np.ndarray]: 535 """ 536 Retrieve `X_new_`. 537 538 Returns: 539 **array of shape (n_samples, n_components):** 540 The transformed SNP data projected onto the `n_components` principal components. 541 n_haplotypes_ is the number of haplotypes, potentially reduced if filtering is applied 542 (`min_percent_snps > 0`). For diploid individuals without filtering, the shape is 543 `(n_samples * 2, n_components)`. 544 """ 545 return self.__X_new_ 546 547 @X_new_.setter 548 def X_new_(self, x: np.ndarray) -> None: 549 """ 550 Update `X_new_`. 551 """ 552 self.__X_new_ = x 553 554 @property 555 def haplotypes_(self) -> Optional[List[str]]: 556 """ 557 Retrieve `haplotypes_`. 558 559 Returns: 560 list of str: 561 A list of unique haplotype identifiers. 562 """ 563 if isinstance(self.__haplotypes_, np.ndarray): 564 return self.__haplotypes_.ravel().tolist() # Flatten and convert NumPy array to a list 565 elif isinstance(self.__haplotypes_, list): 566 if len(self.__haplotypes_) == 1 and isinstance(self.__haplotypes_[0], np.ndarray): 567 return self.__haplotypes_[0].ravel().tolist() # Handle list containing a single array 568 return self.__haplotypes_ # Already a flat list 569 elif self.__haplotypes_ is None: 570 return None # If no haplotypes are set 571 else: 572 raise TypeError("`haplotypes_` must be a list or a NumPy array.") 573 574 @haplotypes_.setter 575 def haplotypes_(self, x: Union[np.ndarray, List[str]]) -> None: 576 """ 577 Update `haplotypes_`. 578 """ 579 if isinstance(x, np.ndarray): 580 self.__haplotypes_ = x.ravel().tolist() # Flatten and convert to a list 581 elif isinstance(x, list): 582 if len(x) == 1 and isinstance(x[0], np.ndarray): # Handle list containing a single array 583 self.__haplotypes_ = x[0].ravel().tolist() 584 else: 585 self.__haplotypes_ = x # Use directly if already a list 586 else: 587 raise TypeError("`x` must be a list or a NumPy array.") 588 589 @property 590 def samples_(self) -> Optional[List[str]]: 591 """ 592 Retrieve `samples_`. 593 594 Returns: 595 list of str: 596 A list of sample identifiers based on `haplotypes_` and `average_strands`. 597 """ 598 haplotypes = self.haplotypes_ 599 if haplotypes is None: 600 return None 601 if self.__average_strands: 602 return haplotypes 603 else: 604 return [x[:-2] for x in haplotypes] 605 606 @property 607 def variants_id_(self) -> Optional[np.ndarray]: 608 """ 609 Retrieve `variants_id_`. 610 611 Returns: 612 **array of shape (n_snp,):** 613 An array containing unique identifiers (IDs) for each SNP, 614 potentially reduced if there are SNPs not present in the `laiobj`. 615 The format will depend on `rsid_or_chrompos`. 616 """ 617 return self.__variants_id_ 618 619 @variants_id_.setter 620 def variants_id_(self, x: np.ndarray) -> None: 621 """ 622 Update `variants_id_`. 623 """ 624 self.__variants_id_ = x 625 626 @property 627 def n_haplotypes(self) -> Optional[int]: 628 """ 629 Retrieve `n_haplotypes`. 630 631 Returns: 632 **int:** 633 The total number of haplotypes, potentially reduced if filtering is applied 634 (`min_percent_snps > 0`). 635 """ 636 return len(self.haplotypes_) 637 638 @property 639 def n_samples(self) -> Optional[int]: 640 """ 641 Retrieve `n_samples`. 642 643 Returns: 644 **int:** 645 The total number of samples, potentially reduced if filtering is applied 646 (`min_percent_snps > 0`). 647 """ 648 return len(np.unique(self.samples_)) 649 650 def copy(self) -> 'mdPCA': 651 """ 652 Create and return a copy of `self`. 653 654 Returns: 655 **mdPCA:** 656 A new instance of the current object. 657 """ 658 return copy.copy(self) 659 660 @staticmethod 661 def _define_ancestry(ancestry, ancestry_map): 662 """ 663 Determine the ancestry index based on different input types. 664 665 Args: 666 ancestry (int or str): The ancestry input, which can be: 667 - An integer (e.g., 0, 1, 2). 668 - A string representation of an integer (e.g., '0', '1'). 669 - A string matching one of the ancestry map values (e.g., 'Africa'). 670 ancestry_map (dict): A dictionary mapping ancestry indices (as strings) to ancestry names. 671 672 Returns: 673 int: The corresponding ancestry index. 674 """ 675 if isinstance(ancestry, int): 676 return ancestry 677 elif isinstance(ancestry, str) and ancestry.isdigit(): 678 return int(ancestry) 679 elif ancestry in ancestry_map.values(): 680 return int(next(key for key, value in ancestry_map.items() if value == ancestry)) 681 else: 682 raise ValueError(f"Invalid ancestry input: {ancestry}") 683 684 def _load_mask_file(self): 685 """ 686 Load previously saved masked genotype data from an `.npz` file. 687 """ 688 mask_files = np.load(self.masks_file, allow_pickle=True) 689 mask = mask_files['mask'] 690 rs_ID_list = mask_files['rs_ID_list'] 691 ind_ID_list = mask_files['ind_ID_list'] 692 labels = mask_files['labels'] 693 weights = mask_files['weights'] 694 return mask, rs_ID_list, ind_ID_list, labels, weights 695 696 @staticmethod 697 def _compute_strength_vector(X): 698 strength_vector = np.sum(~np.isnan(X), axis=1) / X.shape[1] 699 return strength_vector 700 701 @staticmethod 702 def _compute_strength_matrix(X): 703 notmask = (~np.isnan(X)).astype(np.float32) 704 strength_matrix = np.dot(notmask, notmask.T) 705 strength_matrix /= X.shape[1] 706 return strength_matrix 707 708 def _cov(self, x): 709 ddof = 1 710 x = np.ma.array(x, ndmin=2, copy=True, dtype=np.float32) 711 xmask = np.ma.getmaskarray(x) 712 xnotmask = np.logical_not(xmask).astype(np.float32) 713 fact = np.dot(xnotmask, xnotmask.T) * 1. - ddof 714 del xnotmask 715 gc.collect() 716 result = (np.ma.dot(x, x.T, strict=False) / fact).squeeze() 717 x = x.data 718 strength_vec = self._compute_strength_vector(x) 719 strength_mat = self._compute_strength_matrix(x) 720 return result.data, strength_vec, strength_mat 721 722 @staticmethod 723 def _demean(S, w): 724 w_sum = np.matmul(w, S) 725 w_rowsum = w_sum.reshape((1, S.shape[0])) 726 w_colsum = w_sum.reshape((S.shape[0], 1)) 727 w_totalsum = np.dot(w_sum, w) 728 S -= (w_rowsum + w_colsum) - w_totalsum 729 return S 730 731 @staticmethod 732 def _iterative_svd_matrix_completion(cov): 733 num_masked = np.isnan(cov).sum() 734 if num_masked > 0: 735 start_time = time.time() 736 start_rank = 1 737 end_rank = 10 738 rank = 5 739 choose_best = True 740 num_cores = 5 741 cov_complete = IterativeSVD(start_rank=start_rank, end_rank=end_rank, rank=rank, choose_best=choose_best, num_cores=num_cores).fit_transform(cov) 742 logging.info("Iterative SVD --- %s seconds ---" % (time.time() - start_time)) 743 else: 744 cov_complete = cov 745 return cov_complete 746 747 def _weighted_cov_pca(self, X_incomplete, weights, covariance_matrix_file, n_components): 748 start_time = time.time() 749 X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete)) 750 S, _, _ = self._cov(X_incomplete) 751 logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time)) 752 weights_normalized = weights / weights.sum() 753 S = self._demean(S, weights_normalized) 754 W = np.diag(weights) 755 WSW = np.matmul(np.matmul(np.sqrt(W), S), np.sqrt(W)) 756 if covariance_matrix_file: 757 np.save(covariance_matrix_file, S.data) 758 svd = TruncatedSVD(n_components, algorithm="arpack") 759 svd.fit(WSW) 760 U = np.matmul(svd.components_, np.linalg.inv(np.sqrt(W))).T 761 D = np.diag(np.sqrt(svd.singular_values_)) 762 T = np.matmul(U, D) 763 total_var = np.trace(S) 764 for i in range(n_components): 765 pc_percentvar = 100 * svd.singular_values_[i] / total_var 766 logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar) 767 return T 768 769 @staticmethod 770 def _create_validation_mask(X_incomplete, percent_inds): 771 np.random.seed(1) 772 masked_rows = np.isnan(X_incomplete).any(axis=1) 773 masked_inds = np.flatnonzero(masked_rows) 774 X_masked = X_incomplete[masked_rows] 775 percent_masked = 100 * np.isnan(X_masked).sum() / (X_masked.shape[0] * X_masked.shape[1]) 776 unmasked_rows = ~masked_rows 777 X_unmasked = X_incomplete[unmasked_rows] 778 masked_rows = np.random.choice(range(X_unmasked.shape[0]), size=int(X_unmasked.shape[0] * percent_inds / 100), replace=False) 779 X_masked_rows = X_unmasked[masked_rows,:] 780 mask = np.zeros(X_masked_rows.shape[0] * X_masked_rows.shape[1], dtype=np.int8) 781 mask[:int(X_masked_rows.shape[0] * X_masked_rows.shape[1] * percent_masked / 100)] = 1 782 np.random.shuffle(mask) 783 mask = mask.astype(bool) 784 mask = mask.reshape(X_masked_rows.shape) 785 X_masked_rows[mask] = np.nan 786 X_unmasked[masked_rows] = X_masked_rows 787 X_incomplete[unmasked_rows] = X_unmasked 788 masked_rows_new = np.isnan(X_incomplete).any(axis=1) 789 masked_inds_new = np.flatnonzero(masked_rows_new) 790 masked_inds_val = sorted(list(set(masked_inds_new) - set(masked_inds))) 791 return X_incomplete, masked_inds_val 792 793 @staticmethod 794 def _create_cov_mask(cov, strength_mat, percent_vals_masked): 795 cov_flattened = cov.reshape(-1).copy() 796 num_vals_masked = int(percent_vals_masked * len(cov_flattened) / 100) 797 pos_masked = strength_mat.reshape(-1).argsort()[:num_vals_masked] 798 cov_flattened[pos_masked] = np.nan 799 cov_masked = cov_flattened.reshape(cov.shape) 800 return cov_masked 801 802 def _regularized_optimization_ils(self, X_incomplete, covariance_matrix_file, n_components): 803 def run_cov_matrix_regularized_optimization_ils(X_incomplete, covariance_matrix_file): 804 start_time = time.time() 805 X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete)) 806 has_missing = np.isnan(X_incomplete.data).sum() > 0 807 S, _, strength_mat = self._cov(X_incomplete) 808 logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time)) 809 S = self._demean(S, np.ones(S.shape[0]) / S.shape[0]) 810 if has_missing: 811 logging.info("Starting matrix completion. This will take a few minutes...") 812 start_time = time.time() 813 percent_inds_val = 20 # Percent of unmasked individuals to be masked for cross-validation 814 X_incomplete, masked_inds_val = self._create_validation_mask(X_incomplete.data, percent_inds_val) # masked_inds_val is the list of indices of the individuals masked for validation 815 X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete)) 816 S_prime, _, w_mat_prime = self._cov(X_incomplete) 817 del X_incomplete 818 gc.collect() 819 S_prime = self._demean(S_prime, np.ones(S_prime.shape[0]) / S_prime.shape[0]) 820 S_robust, lam = self.matrix_completion(S, strength_mat, S_prime, w_mat_prime, lams=None, method="NN", 821 cv_inds=masked_inds_val) 822 logging.info(f"Covariance Matrix --- %{time.time() - start_time:.2}s seconds ---") 823 else: 824 S_robust = S.copy() 825 if covariance_matrix_file: 826 np.save(covariance_matrix_file, S.data) 827 if has_missing: 828 base, ext = os.path.splitext(covariance_matrix_file) 829 np.save(f"{base}_completed_{lam}{ext}", S_robust.data) 830 return S, S_robust 831 832 def _compute_projection_regularized_optimization_ils(X_incomplete, S, S_robust, n_components): 833 _, _, strength_mat = self._cov(np.ma.array(X_incomplete, mask=np.isnan(X_incomplete))) 834 nonmissing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) == 0) 835 missing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) > 0) 836 missing_sorted = sorted(missing, key=lambda x: (~np.isnan(X_incomplete[x])).sum(), reverse=True) 837 svd = TruncatedSVD(n_components, algorithm="arpack") 838 svd.fit(S_robust) 839 U = svd.components_.T 840 U_nonmissing = U[nonmissing] 841 D_nonmissing = np.diag(np.sqrt(svd.singular_values_)) 842 T = np.zeros((X_incomplete.shape[0], n_components)) 843 T_filled = np.matmul(U_nonmissing, D_nonmissing) 844 i_filled = nonmissing 845 T[nonmissing] = T_filled 846 for i in missing_sorted: 847 S_i = S[i][i_filled] 848 W_i = np.diag(strength_mat[i][i_filled]) 849 A = np.matmul(W_i, T_filled) 850 b = np.matmul(W_i, S_i) 851 t = np.linalg.lstsq(A, b, rcond=None)[0] 852 T[i] = t 853 i_filled = np.append(i_filled, i) 854 T_filled = np.append(T_filled, [t], axis=0) 855 total_var = np.trace(S) 856 for i in range(n_components): 857 pc_percentvar = 100 * svd.singular_values_[i] / total_var 858 logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar) 859 return T 860 861 S, S_robust = run_cov_matrix_regularized_optimization_ils(X_incomplete, covariance_matrix_file) 862 T = _compute_projection_regularized_optimization_ils(X_incomplete, S, S_robust, n_components) 863 return T 864 865 def _cov_matrix_imputation(self, X_incomplete, percent_vals_masked, covariance_matrix_file, n_components): 866 start_time = time.time() 867 X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete)) 868 S, strength_vec, strength_mat = self._cov(X_incomplete) 869 logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time)) 870 S_masked = self._create_cov_mask(S, strength_mat, percent_vals_masked) 871 S = self._iterative_svd_matrix_completion(S_masked) 872 weights = np.ones(len(strength_vec)) 873 weights_normalized = weights / weights.sum() 874 S = self._demean(S, weights_normalized) 875 W = np.diag(weights) 876 WSW = np.matmul(np.matmul(np.sqrt(W), S), np.sqrt(W)) 877 if covariance_matrix_file: 878 np.save(covariance_matrix_file, S.data) 879 svd = TruncatedSVD(n_components, algorithm="arpack") 880 svd.fit(WSW) 881 U = np.matmul(svd.components_, np.linalg.inv(np.sqrt(W))).T 882 D = np.diag(np.sqrt(svd.singular_values_)) 883 T = np.matmul(U, D) 884 total_var = np.trace(S) 885 for i in range(n_components): 886 pc_percentvar = 100 * svd.singular_values_[i] / total_var 887 logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar) 888 return T 889 890 def _cov_matrix_imputation_ils(self, X_incomplete, percent_vals_masked, covariance_matrix_file, n_components): 891 892 def run_cov_matrix_cov_matrix_imputation_ils(X_incomplete, covariance_matrix_file): 893 start_time = time.time() 894 X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete)) 895 S, strength_vec, strength_mat = self._cov(X_incomplete) 896 logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time)) 897 S_masked = self._create_cov_mask(S, strength_mat, percent_vals_masked) 898 S = self._iterative_svd_matrix_completion(S_masked) 899 weights = np.ones(len(strength_vec)) 900 weights_normalized = weights / weights.sum() 901 S = self._demean(S, weights_normalized) 902 W = np.diag(weights) 903 if covariance_matrix_file: 904 np.save(covariance_matrix_file, S.data) 905 return S, W 906 907 def compute_projection_cov_matrix_imputation_ils(X_incomplete, S, W, n_components): 908 S, _, strength_mat = self._cov(np.ma.array(X_incomplete, mask=np.isnan(X_incomplete))) 909 S = self._demean(S, np.ones(S.shape[0]) / S.shape[0]) 910 nonmissing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) == 0) 911 missing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) > 0) 912 missing_sorted = sorted(missing, key=lambda x: (~np.isnan(X_incomplete[x])).sum(), reverse=True) 913 svd = TruncatedSVD(n_components, algorithm="arpack") 914 WSW = np.matmul(np.matmul(np.sqrt(W), S), np.sqrt(W)) 915 svd.fit(WSW) 916 U = np.matmul(svd.components_, np.linalg.inv(np.sqrt(W))).T 917 U_nonmissing = U[nonmissing] 918 D_nonmissing = np.diag(np.sqrt(svd.singular_values_)) 919 T = np.zeros((X_incomplete.shape[0], n_components)) 920 T_filled = np.matmul(U_nonmissing, D_nonmissing) 921 i_filled = nonmissing 922 T[nonmissing] = T_filled 923 for i in missing_sorted: 924 S_i = S[i][i_filled] 925 W_i = np.diag(strength_mat[i][i_filled]) 926 A = np.matmul(W_i, T_filled) 927 b = np.matmul(W_i, S_i) 928 t = np.linalg.lstsq(A, b, rcond=None)[0] 929 T[i] = t 930 i_filled = np.append(i_filled, i) 931 T_filled = np.append(T_filled, [t], axis=0) 932 total_var = np.trace(S) 933 for i in range(n_components): 934 pc_percentvar = 100 * svd.singular_values_[i] / total_var 935 logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar) 936 return T 937 938 S, W = run_cov_matrix_cov_matrix_imputation_ils(X_incomplete, covariance_matrix_file) 939 T = compute_projection_cov_matrix_imputation_ils(X_incomplete, S, W, n_components) 940 return T 941 942 def _nonmissing_pca_ils(self, X_incomplete, covariance_matrix_file, n_components): 943 944 def run_cov_matrix_nonmissing_pca_ils(X_incomplete, covariance_matrix_file): 945 start_time = time.time() 946 nonmissing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) == 0) 947 X_incomplete_nonmissing = X_incomplete[nonmissing] 948 X_incomplete_nonmissing = np.ma.array(X_incomplete_nonmissing, mask=np.isnan(X_incomplete_nonmissing)) 949 S_nonmissing, _, _ = self._cov(X_incomplete_nonmissing) 950 logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time)) 951 S_nonmissing = self._demean(S_nonmissing, np.ones(S_nonmissing.shape[0]) / S_nonmissing.shape[0]) 952 if covariance_matrix_file: 953 np.save(covariance_matrix_file, S_nonmissing.data) 954 return S_nonmissing 955 956 def compute_projection_nonmissing_pca_ils(X_incomplete, S_nonmissing, n_components): 957 S, _, strength_mat = self._cov(np.ma.array(X_incomplete, mask=np.isnan(X_incomplete))) 958 S = self._demean(S, np.ones(S.shape[0]) / S.shape[0]) 959 nonmissing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) == 0) 960 missing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) > 0) 961 missing_sorted = sorted(missing, key=lambda x: (~np.isnan(X_incomplete[x])).sum(), reverse=True) 962 svd = TruncatedSVD(n_components, algorithm="arpack") 963 svd.fit(S_nonmissing) 964 U_nonmissing = svd.components_.T 965 D_nonmissing = np.diag(np.sqrt(svd.singular_values_)) 966 T = np.zeros((X_incomplete.shape[0], n_components)) 967 T_filled = np.matmul(U_nonmissing, D_nonmissing) 968 i_filled = nonmissing 969 T[nonmissing] = T_filled 970 for i in missing_sorted: 971 S_i = S[i][i_filled] 972 W_i = np.diag(strength_mat[i][i_filled]) 973 A = np.matmul(W_i, T_filled) 974 b = np.matmul(W_i, S_i) 975 t = np.linalg.lstsq(A, b, rcond=None)[0] 976 T[i] = t 977 i_filled = np.append(i_filled, i) 978 T_filled = np.append(T_filled, [t], axis=0) 979 total_var = np.trace(S) 980 for i in range(n_components): 981 pc_percentvar = 100 * svd.singular_values_[i] / total_var 982 logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar) 983 return T 984 985 S_nonmissing = run_cov_matrix_nonmissing_pca_ils(X_incomplete, covariance_matrix_file) 986 T = compute_projection_nonmissing_pca_ils(X_incomplete, S_nonmissing, n_components) 987 return T 988 989 def _run_cov_matrix(self, X_incomplete, weights): 990 """ 991 Runs the specified PCA method on the incomplete data. 992 """ 993 if self.method == "weighted_cov_pca": 994 return self._weighted_cov_pca(X_incomplete, weights, self.covariance_matrix_file, self.n_components) 995 elif self.method == "regularized_optimization_ils": 996 return self._regularized_optimization_ils(X_incomplete, self.covariance_matrix_file, self.n_components) 997 elif self.method == "cov_matrix_imputation": 998 return self._cov_matrix_imputation(X_incomplete, self.percent_vals_masked, self.covariance_matrix_file, self.n_components) 999 elif self.method == "cov_matrix_imputation_ils": 1000 return self._cov_matrix_imputation_ils(X_incomplete, self.percent_vals_masked, self.covariance_matrix_file, self.n_components) 1001 elif self.method == "nonmissing_pca_ils": 1002 return self._nonmissing_pca_ils(X_incomplete, self.covariance_matrix_file, self.n_components) 1003 else: 1004 raise ValueError(f"Unsupported method: {self.method}") 1005 1006 def fit_transform( 1007 self, 1008 snpobj: Optional['SNPObject'] = None, 1009 laiobj: Optional['LocalAncestryObject'] = None, 1010 labels_file: Optional[str] = None, 1011 ancestry: Optional[str] = None, 1012 average_strands: Optional[bool] = None 1013 ) -> np.ndarray: 1014 """ 1015 Fit the model to the SNP data stored in the provided `snpobj` and apply the dimensionality reduction on the same SNP data. 1016 1017 This method starts by loading or updating SNP and ancestry data. Then, it manages missing values by applying 1018 masks based on ancestry, either by loading a pre-existing mask or generating new ones. After processing these 1019 masks to produce an incomplete SNP data matrix, it applies the chosen mdPCA method to reduce dimensionality 1020 while handling missing data as specified. 1021 1022 Args: 1023 snpobj (SNPObject, optional): 1024 A SNPObject instance. 1025 laiobj (LAIObject, optional): 1026 A LAIObject instance. 1027 labels_file (str, optional): 1028 Path to the labels file in .tsv format. The first column, `indID`, contains the individual identifiers, and the second 1029 column, `label`, specifies the groups for all individuals. If `is_weighted=True`, a `weight` column with individual 1030 weights is required. Optionally, `combination` and `combination_weight` columns can specify sets of individuals to be 1031 combined into groups, with respective weights. 1032 ancestry (str, optional): 1033 Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0. 1034 average_strands (bool, optional): 1035 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 1036 If None, defaults to `self.average_strands`. 1037 1038 Returns: 1039 **array of shape (n_samples, n_components):** 1040 The transformed SNP data projected onto the `n_components` principal components, stored in `self.X_new_`. 1041 """ 1042 if snpobj is None: 1043 snpobj = self.snpobj 1044 if laiobj is None: 1045 laiobj = self.laiobj 1046 if labels_file is None: 1047 labels_file = self.labels_file 1048 if ancestry is None: 1049 ancestry = self.ancestry 1050 if average_strands is None: 1051 average_strands = self.average_strands 1052 1053 if self.load_masks: 1054 # Load precomputed ancestry-based masked genotype matrix, SNP identifiers, haplotype identifiers, and weights 1055 mask, variants_id, haplotypes, _, weights = self._load_mask_file() 1056 else: 1057 # Process genotype data with optional ancestry-based masking and return the corresponding SNP and individual identifiers 1058 mask, variants_id, haplotypes = process_calldata_gt( 1059 self.snpobj, 1060 self.laiobj, 1061 self.ancestry, 1062 self.average_strands, 1063 self.force_nan_incomplete_strands, 1064 self.is_masked, 1065 self.rsid_or_chrompos 1066 ) 1067 1068 # Process individual genomic labels and weights, aligning them with a masked genotype matrix by 1069 # filtering out low-coverage individuals, reordering data to match the matrix structure, and 1070 # handling group-based adjustments 1071 mask, haplotypes, _, weights = process_labels_weights( 1072 self.labels_file, 1073 mask, 1074 variants_id, 1075 haplotypes, 1076 self.average_strands, 1077 self.ancestry, 1078 self.min_percent_snps, 1079 self.group_snp_frequencies_only, 1080 self.groups_to_remove, 1081 self.is_weighted, 1082 self.save_masks, 1083 self.masks_file 1084 ) 1085 1086 # Call run_cov_matrix with the specified method 1087 self.X_new_ = self._run_cov_matrix( 1088 mask[self.ancestry].T, 1089 weights 1090 ) 1091 1092 self.haplotypes_ = haplotypes 1093 self.variants_id_ = variants_id 1094 1095 return self.X_new_
A class for performing missing data principal component analysis (mdPCA) on SNP data.
The mdPCA class focuses on genotype segments from the ancestry of interest when the is_masked
flag is set to True
. It offers
flexible processing options, allowing either separate handling of masked haplotype strands or combining (averaging) strands into a
single composite representation for each individual. Moreover, the analysis can be performed on individual-level data, group-level SNP
frequencies, or a combination of both.
If the snpobj
, laiobj
, labels_file
, and ancestry
parameters are all provided during instantiation, the fit_transform
method
will be automatically called, applying the specified mdPCA method to transform the data upon instantiation.
31 def __init__( 32 self, 33 method: str = 'weighted_cov_pca', 34 snpobj: Optional['SNPObject'] = None, 35 laiobj: Optional['LocalAncestryObject'] = None, 36 labels_file: Optional[str] = None, 37 ancestry: Optional[Union[int, str]] = None, 38 is_masked: bool = True, 39 average_strands: bool = False, 40 force_nan_incomplete_strands: bool = False, 41 is_weighted: bool = False, 42 groups_to_remove: List[str] = None, 43 min_percent_snps: float = 4, 44 group_snp_frequencies_only: bool = True, 45 save_masks: bool = False, 46 load_masks: bool = False, 47 masks_file: Union[str, pathlib.Path] = 'masks.npz', 48 output_file: Union[str, pathlib.Path] = 'output.tsv', 49 covariance_matrix_file: Optional[str] = None, 50 n_components: int = 2, 51 rsid_or_chrompos: int = 2, 52 percent_vals_masked: float = 0 53 ): 54 """ 55 Args: 56 method (str, default='weighted_cov_pca'): 57 The PCA method to use for dimensionality reduction. Options include: 58 - `'weighted_cov_pca'`: 59 Simple covariance-based PCA, weighted by sample strengths. 60 - `'regularized_optimization_ils'`: 61 Regularized optimization followed by iterative, weighted (via the strengths) least squares projection of 62 missing samples using the original covariance matrix (considering only relevant elements not missing in 63 the original covariance matrix for those samples). 64 - `'cov_matrix_imputation'`: 65 Eigen-decomposition of the covariance matrix after first imputing the covariance matrix missing values 66 using the Iterative SVD imputation method. 67 - `'cov_matrix_imputation_ils'`: 68 The method of 'cov_matrix_imputation', but where afterwards missing samples are re-projected onto the space 69 given by 'cov_matrix_imputation' using the same iterative method on the original covariance matrix just 70 as done in 'regularized_optimization_ils'. 71 - `'nonmissing_pca_ils'`: 72 The method of 'weighted_cov_pca' on the non-missing samples, followed by the projection of missing samples onto 73 the space given by 'weighted_cov_pca' using the same iterative method on the original covariance matrix just as 74 done in 'regularized_optimization_ils'. 75 snpobj (SNPObject, optional): 76 A SNPObject instance. 77 laiobj (LAIObject, optional): 78 A LAIObject instance. 79 labels_file (str, optional): 80 Path to a `.tsv` file with metadata on individuals, including population labels, optional weights, and groupings. 81 The `indID` column must contain unique individual identifiers matching those in `laiobj` and `snpobj` for proper alignment. 82 The `label` column assigns population groups. If `is_weighted=True`, a `weight` column must be provided, assigning a weight to 83 each individual, where those with a weight of zero are removed. Optional columns include `combination` and `combination_weight` 84 to aggregate individuals into combined groups, where SNP frequencies represent their sequences. The `combination` column assigns 85 each individual to a specific group (0 for no combination, 1 for the first group, 2 for the second, etc.). All members of a group 86 must share the same `label` and `combination_weight`. If `combination_weight` column is not provided, the combinations are 87 assigned a default weight of `1`. Individuals excluded via `groups_to_remove` or those falling below `min_percent_snps` are removed 88 from the analysis. 89 ancestry (int or str, optional): 90 Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at `0`. The ancestry input can be: 91 - An integer (e.g., 0, 1, 2). 92 - A string representation of an integer (e.g., '0', '1'). 93 - A string matching one of the ancestry map values (e.g., 'Africa'). 94 is_masked (bool, default=True): 95 If `True`, applies ancestry-specific masking to the genotype matrix, retaining only genotype data 96 corresponding to the specified `ancestry`. If `False`, uses the full, unmasked genotype matrix. 97 average_strands (bool, default=False): 98 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 99 force_nan_incomplete_strands (bool): 100 If `True`, sets the result to NaN if either haplotype in a pair is NaN. 101 Otherwise, computes the mean while ignoring NaNs (e.g., 0|NaN -> 0, 1|NaN -> 1). 102 is_weighted (bool, default=False): 103 If `True`, assigns individual weights from the `weight` column in `labels_file`. Otherwise, all individuals have equal weight of `1`. 104 groups_to_remove (list of str, optional): 105 List with groups to exclude from analysis. Example: ['group1', 'group2']. 106 min_percent_snps (float, default=4): 107 Minimum percentage of SNPs that must be known for an individual and of the ancestry of interest to be included in the analysis. 108 All individuals with fewer percent of unmasked SNPs than this threshold will be excluded. 109 group_snp_frequencies_only (bool, default=True): 110 If True, mdPCA is performed exclusively on group-level SNP frequencies, ignoring individual-level data. This applies when `is_weighted` is 111 set to True and a `combination` column is provided in the `labels_file`, meaning individuals are aggregated into groups based on their assigned 112 labels. If False, mdPCA is performed on individual-level SNP data alone or on both individual-level and group-level SNP frequencies when 113 `is_weighted` is True and a `combination` column is provided. 114 save_masks (bool, default=False): 115 True if the masked matrices are to be saved in a `.npz` file, or False otherwise. 116 load_masks (bool, default=False): 117 True if the masked matrices are to be loaded from a pre-existing `.npz` file specified by `masks_file`, or False otherwise. 118 masks_file (str or pathlib.Path, default='masks.npz'): 119 Path to the `.npz` file used for saving/loading masked matrices. 120 output_file (str or pathlib.Path, default='output.tsv'): 121 Path to the output `.tsv` file where mdPCA results are saved. 122 covariance_matrix_file (str, optional): 123 Path to save the covariance matrix file in `.npy` format. If None, the covariance matrix is not saved. Default is None. 124 n_components (int, default=2): 125 The number of principal components. 126 rsid_or_chrompos (int, default=2): 127 Format indicator for SNP IDs in `self.__X_new_`. Use 1 for `rsID` format or 2 for `chromosome_position`. 128 percent_vals_masked (float, default=0): 129 Percentage of values in the covariance matrix to be masked and then imputed. Only applicable if `method` is 130 `'cov_matrix_imputation'` or `'cov_matrix_imputation_ils'`. 131 """ 132 self.__snpobj = snpobj 133 self.__laiobj = laiobj 134 self.__labels_file = labels_file 135 self.__ancestry = self._define_ancestry(ancestry, laiobj.ancestry_map) 136 self.__method = method 137 self.__is_masked = is_masked 138 self.__average_strands = average_strands 139 self.__force_nan_incomplete_strands = force_nan_incomplete_strands 140 self.__is_weighted = is_weighted 141 self.__groups_to_remove = groups_to_remove 142 self.__min_percent_snps = min_percent_snps 143 self.__group_snp_frequencies_only = group_snp_frequencies_only 144 self.__save_masks = save_masks 145 self.__load_masks = load_masks 146 self.__masks_file = masks_file 147 self.__output_file = output_file 148 self.__covariance_matrix_file = covariance_matrix_file 149 self.__n_components = n_components 150 self.__rsid_or_chrompos = rsid_or_chrompos 151 self.__percent_vals_masked = percent_vals_masked 152 self.__X_new_ = None # Store transformed SNP data 153 self.__haplotypes_ = None # Store haplotypes of X_new_ (after filtering if min_percent_snps > 0) 154 self.__samples_ = None # Store samples of X_new_ (after filtering if min_percent_snps > 0) 155 self.__variants_id_ = None # Store variants ID (after filtering SNPs not in laiobj) 156 157 # Fit and transform if a `snpobj`, `laiobj`, `labels_file`, and `ancestry` are provided 158 if self.snpobj is not None and self.laiobj is not None and self.labels_file is not None and self.ancestry is not None: 159 self.fit_transform(snpobj, laiobj, labels_file, ancestry)
Arguments:
- method (str, default='weighted_cov_pca'): The PCA method to use for dimensionality reduction. Options include:
'weighted_cov_pca'
: Simple covariance-based PCA, weighted by sample strengths.'regularized_optimization_ils'
: Regularized optimization followed by iterative, weighted (via the strengths) least squares projection of missing samples using the original covariance matrix (considering only relevant elements not missing in the original covariance matrix for those samples).'cov_matrix_imputation'
: Eigen-decomposition of the covariance matrix after first imputing the covariance matrix missing values using the Iterative SVD imputation method.'cov_matrix_imputation_ils'
: The method of 'cov_matrix_imputation', but where afterwards missing samples are re-projected onto the space given by 'cov_matrix_imputation' using the same iterative method on the original covariance matrix just as done in 'regularized_optimization_ils'.'nonmissing_pca_ils'
: The method of 'weighted_cov_pca' on the non-missing samples, followed by the projection of missing samples onto the space given by 'weighted_cov_pca' using the same iterative method on the original covariance matrix just as done in 'regularized_optimization_ils'.
- snpobj (SNPObject, optional): A SNPObject instance.
- laiobj (LAIObject, optional): A LAIObject instance.
- labels_file (str, optional): Path to a
.tsv
file with metadata on individuals, including population labels, optional weights, and groupings. TheindID
column must contain unique individual identifiers matching those inlaiobj
andsnpobj
for proper alignment. Thelabel
column assigns population groups. Ifis_weighted=True
, aweight
column must be provided, assigning a weight to each individual, where those with a weight of zero are removed. Optional columns includecombination
andcombination_weight
to aggregate individuals into combined groups, where SNP frequencies represent their sequences. Thecombination
column assigns each individual to a specific group (0 for no combination, 1 for the first group, 2 for the second, etc.). All members of a group must share the samelabel
andcombination_weight
. Ifcombination_weight
column is not provided, the combinations are assigned a default weight of1
. Individuals excluded viagroups_to_remove
or those falling belowmin_percent_snps
are removed from the analysis. - ancestry (int or str, optional): Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at
0
. The ancestry input can be:- An integer (e.g., 0, 1, 2).
- A string representation of an integer (e.g., '0', '1').
- A string matching one of the ancestry map values (e.g., 'Africa').
- is_masked (bool, default=True): If
True
, applies ancestry-specific masking to the genotype matrix, retaining only genotype data corresponding to the specifiedancestry
. IfFalse
, uses the full, unmasked genotype matrix. - average_strands (bool, default=False): True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
- force_nan_incomplete_strands (bool): If
True
, sets the result to NaN if either haplotype in a pair is NaN. Otherwise, computes the mean while ignoring NaNs (e.g., 0|NaN -> 0, 1|NaN -> 1). - is_weighted (bool, default=False): If
True
, assigns individual weights from theweight
column inlabels_file
. Otherwise, all individuals have equal weight of1
. - groups_to_remove (list of str, optional): List with groups to exclude from analysis. Example: ['group1', 'group2'].
- min_percent_snps (float, default=4): Minimum percentage of SNPs that must be known for an individual and of the ancestry of interest to be included in the analysis. All individuals with fewer percent of unmasked SNPs than this threshold will be excluded.
- group_snp_frequencies_only (bool, default=True): If True, mdPCA is performed exclusively on group-level SNP frequencies, ignoring individual-level data. This applies when
is_weighted
is set to True and acombination
column is provided in thelabels_file
, meaning individuals are aggregated into groups based on their assigned labels. If False, mdPCA is performed on individual-level SNP data alone or on both individual-level and group-level SNP frequencies whenis_weighted
is True and acombination
column is provided. - save_masks (bool, default=False): True if the masked matrices are to be saved in a
.npz
file, or False otherwise. - load_masks (bool, default=False): True if the masked matrices are to be loaded from a pre-existing
.npz
file specified bymasks_file
, or False otherwise. - masks_file (str or pathlib.Path, default='masks.npz'): Path to the
.npz
file used for saving/loading masked matrices. - output_file (str or pathlib.Path, default='output.tsv'): Path to the output
.tsv
file where mdPCA results are saved. - covariance_matrix_file (str, optional): Path to save the covariance matrix file in
.npy
format. If None, the covariance matrix is not saved. Default is None. - n_components (int, default=2): The number of principal components.
- rsid_or_chrompos (int, default=2): Format indicator for SNP IDs in
self.__X_new_
. Use 1 forrsID
format or 2 forchromosome_position
. - percent_vals_masked (float, default=0): Percentage of values in the covariance matrix to be masked and then imputed. Only applicable if
method
is'cov_matrix_imputation'
or'cov_matrix_imputation_ils'
.
249 @property 250 def ancestry(self) -> Optional[str]: 251 """ 252 Retrieve `ancestry`. 253 254 Returns: 255 **str:** Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0. 256 """ 257 return self.__ancestry
Retrieve ancestry
.
Returns:
str: Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0.
266 @property 267 def is_masked(self) -> bool: 268 """ 269 Retrieve `is_masked`. 270 271 Returns: 272 **bool:** True if an ancestry file is passed for ancestry-specific masking, or False otherwise. 273 """ 274 return self.__is_masked
Retrieve is_masked
.
Returns:
bool: True if an ancestry file is passed for ancestry-specific masking, or False otherwise.
283 @property 284 def average_strands(self) -> bool: 285 """ 286 Retrieve `average_strands`. 287 288 Returns: 289 **bool:** True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 290 """ 291 return self.__average_strands
Retrieve average_strands
.
Returns:
bool: True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
300 @property 301 def force_nan_incomplete_strands(self) -> bool: 302 """ 303 Retrieve `force_nan_incomplete_strands`. 304 305 Returns: 306 **bool**: If `True`, sets the result to NaN if either haplotype in a pair is NaN. 307 Otherwise, computes the mean while ignoring NaNs (e.g., 0|NaN -> 0, 1|NaN -> 1). 308 """ 309 return self.__force_nan_incomplete_strands
Retrieve force_nan_incomplete_strands
.
Returns:
bool: If
True
, sets the result to NaN if either haplotype in a pair is NaN. Otherwise, computes the mean while ignoring NaNs (e.g., 0|NaN -> 0, 1|NaN -> 1).
318 @property 319 def is_weighted(self) -> bool: 320 """ 321 Retrieve `is_weighted`. 322 323 Returns: 324 **bool:** True if weights are provided in the labels file, or False otherwise. 325 """ 326 return self.__is_weighted
Retrieve is_weighted
.
Returns:
bool: True if weights are provided in the labels file, or False otherwise.
335 @property 336 def groups_to_remove(self) -> Dict[int, List[str]]: 337 """ 338 Retrieve `groups_to_remove`. 339 340 Returns: 341 **dict of int to list of str:** Dictionary specifying groups to exclude from analysis. Keys are array numbers, and values are 342 lists of groups to remove for each array. Example: `{1: ['group1', 'group2'], 2: [], 3: ['group3']}`. 343 """ 344 return self.__groups_to_remove
Retrieve groups_to_remove
.
Returns:
dict of int to list of str: Dictionary specifying groups to exclude from analysis. Keys are array numbers, and values are lists of groups to remove for each array. Example:
{1: ['group1', 'group2'], 2: [], 3: ['group3']}
.
353 @property 354 def min_percent_snps(self) -> float: 355 """ 356 Retrieve `min_percent_snps`. 357 358 Returns: 359 **float:** 360 Minimum percentage of SNPs that must be known for an individual to be included in the analysis. 361 All individuals with fewer percent of unmasked SNPs than this threshold will be excluded. 362 """ 363 return self.__min_percent_snps
Retrieve min_percent_snps
.
Returns:
float: Minimum percentage of SNPs that must be known for an individual to be included in the analysis. All individuals with fewer percent of unmasked SNPs than this threshold will be excluded.
372 @property 373 def group_snp_frequencies_only(self) -> bool: 374 """ 375 Retrieve `group_snp_frequencies_only`. 376 377 Returns: 378 **bool:** 379 If True, mdPCA is performed exclusively on group-level SNP frequencies, ignoring individual-level data. This applies 380 when `is_weighted` is set to True and a `combination` column is provided in the `labels_file`, meaning individuals are 381 aggregated into groups based on their assigned labels. If False, mdPCA is performed on individual-level SNP data alone 382 or on both individual-level and group-level SNP frequencies when `is_weighted` is True and a `combination` column is provided. 383 """ 384 return self.__group_snp_frequencies_only
Retrieve group_snp_frequencies_only
.
Returns:
bool: If True, mdPCA is performed exclusively on group-level SNP frequencies, ignoring individual-level data. This applies when
is_weighted
is set to True and acombination
column is provided in thelabels_file
, meaning individuals are aggregated into groups based on their assigned labels. If False, mdPCA is performed on individual-level SNP data alone or on both individual-level and group-level SNP frequencies whenis_weighted
is True and acombination
column is provided.
393 @property 394 def save_masks(self) -> bool: 395 """ 396 Retrieve `save_masks`. 397 398 Returns: 399 **bool:** True if the masked matrices are to be saved in a `.npz` file, or False otherwise. 400 """ 401 return self.__save_masks
Retrieve save_masks
.
Returns:
bool: True if the masked matrices are to be saved in a
.npz
file, or False otherwise.
410 @property 411 def load_masks(self) -> bool: 412 """ 413 Retrieve `load_masks`. 414 415 Returns: 416 **bool:** 417 True if the masked matrices are to be loaded from a pre-existing `.npz` file specified 418 by `masks_file`, or False otherwise. 419 """ 420 return self.__load_masks
Retrieve load_masks
.
Returns:
bool: True if the masked matrices are to be loaded from a pre-existing
.npz
file specified bymasks_file
, or False otherwise.
429 @property 430 def masks_file(self) -> Union[str, pathlib.Path]: 431 """ 432 Retrieve `masks_file`. 433 434 Returns: 435 **str or pathlib.Path:** Path to the `.npz` file used for saving/loading masked matrices. 436 """ 437 return self.__masks_file
Retrieve masks_file
.
Returns:
str or pathlib.Path: Path to the
.npz
file used for saving/loading masked matrices.
446 @property 447 def output_file(self) -> Union[str, pathlib.Path]: 448 """ 449 Retrieve `output_file`. 450 451 Returns: 452 **str or pathlib.Path:** Path to the output `.tsv` file where mdPCA results are saved. 453 """ 454 return self.__output_file
Retrieve output_file
.
Returns:
str or pathlib.Path: Path to the output
.tsv
file where mdPCA results are saved.
463 @property 464 def covariance_matrix_file(self) -> Optional[str]: 465 """ 466 Retrieve `covariance_matrix_file`. 467 468 Returns: 469 **str:** Path to save the covariance matrix file in `.npy` format. 470 """ 471 return self.__covariance_matrix_file
Retrieve covariance_matrix_file
.
Returns:
str: Path to save the covariance matrix file in
.npy
format.
497 @property 498 def rsid_or_chrompos(self) -> int: 499 """ 500 Retrieve `rsid_or_chrompos`. 501 502 Returns: 503 **int:** Format indicator for SNP IDs in the SNP data. Use 1 for `rsID` format or 2 for `chromosome_position`. 504 """ 505 return self.__rsid_or_chrompos
Retrieve rsid_or_chrompos
.
Returns:
int: Format indicator for SNP IDs in the SNP data. Use 1 for
rsID
format or 2 forchromosome_position
.
514 @property 515 def percent_vals_masked(self) -> float: 516 """ 517 Retrieve `percent_vals_masked`. 518 519 Returns: 520 **float:** 521 Percentage of values in the covariance matrix to be masked and then imputed. Only applicable if `method` is 522 `'cov_matrix_imputation'` or `'cov_matrix_imputation_ils'`. 523 """ 524 return self.__percent_vals_masked
Retrieve percent_vals_masked
.
Returns:
float: Percentage of values in the covariance matrix to be masked and then imputed. Only applicable if
method
is'cov_matrix_imputation'
or'cov_matrix_imputation_ils'
.
533 @property 534 def X_new_(self) -> Optional[np.ndarray]: 535 """ 536 Retrieve `X_new_`. 537 538 Returns: 539 **array of shape (n_samples, n_components):** 540 The transformed SNP data projected onto the `n_components` principal components. 541 n_haplotypes_ is the number of haplotypes, potentially reduced if filtering is applied 542 (`min_percent_snps > 0`). For diploid individuals without filtering, the shape is 543 `(n_samples * 2, n_components)`. 544 """ 545 return self.__X_new_
Retrieve X_new_
.
Returns:
array of shape (n_samples, n_components): The transformed SNP data projected onto the
n_components
principal components. n_haplotypes_ is the number of haplotypes, potentially reduced if filtering is applied (min_percent_snps > 0
). For diploid individuals without filtering, the shape is(n_samples * 2, n_components)
.
554 @property 555 def haplotypes_(self) -> Optional[List[str]]: 556 """ 557 Retrieve `haplotypes_`. 558 559 Returns: 560 list of str: 561 A list of unique haplotype identifiers. 562 """ 563 if isinstance(self.__haplotypes_, np.ndarray): 564 return self.__haplotypes_.ravel().tolist() # Flatten and convert NumPy array to a list 565 elif isinstance(self.__haplotypes_, list): 566 if len(self.__haplotypes_) == 1 and isinstance(self.__haplotypes_[0], np.ndarray): 567 return self.__haplotypes_[0].ravel().tolist() # Handle list containing a single array 568 return self.__haplotypes_ # Already a flat list 569 elif self.__haplotypes_ is None: 570 return None # If no haplotypes are set 571 else: 572 raise TypeError("`haplotypes_` must be a list or a NumPy array.")
589 @property 590 def samples_(self) -> Optional[List[str]]: 591 """ 592 Retrieve `samples_`. 593 594 Returns: 595 list of str: 596 A list of sample identifiers based on `haplotypes_` and `average_strands`. 597 """ 598 haplotypes = self.haplotypes_ 599 if haplotypes is None: 600 return None 601 if self.__average_strands: 602 return haplotypes 603 else: 604 return [x[:-2] for x in haplotypes]
Retrieve samples_
.
Returns:
list of str: A list of sample identifiers based on
haplotypes_
andaverage_strands
.
606 @property 607 def variants_id_(self) -> Optional[np.ndarray]: 608 """ 609 Retrieve `variants_id_`. 610 611 Returns: 612 **array of shape (n_snp,):** 613 An array containing unique identifiers (IDs) for each SNP, 614 potentially reduced if there are SNPs not present in the `laiobj`. 615 The format will depend on `rsid_or_chrompos`. 616 """ 617 return self.__variants_id_
Retrieve variants_id_
.
Returns:
array of shape (n_snp,): An array containing unique identifiers (IDs) for each SNP, potentially reduced if there are SNPs not present in the
laiobj
. The format will depend onrsid_or_chrompos
.
626 @property 627 def n_haplotypes(self) -> Optional[int]: 628 """ 629 Retrieve `n_haplotypes`. 630 631 Returns: 632 **int:** 633 The total number of haplotypes, potentially reduced if filtering is applied 634 (`min_percent_snps > 0`). 635 """ 636 return len(self.haplotypes_)
Retrieve n_haplotypes
.
Returns:
int: The total number of haplotypes, potentially reduced if filtering is applied (
min_percent_snps > 0
).
638 @property 639 def n_samples(self) -> Optional[int]: 640 """ 641 Retrieve `n_samples`. 642 643 Returns: 644 **int:** 645 The total number of samples, potentially reduced if filtering is applied 646 (`min_percent_snps > 0`). 647 """ 648 return len(np.unique(self.samples_))
Retrieve n_samples
.
Returns:
int: The total number of samples, potentially reduced if filtering is applied (
min_percent_snps > 0
).
650 def copy(self) -> 'mdPCA': 651 """ 652 Create and return a copy of `self`. 653 654 Returns: 655 **mdPCA:** 656 A new instance of the current object. 657 """ 658 return copy.copy(self)
Create and return a copy of self
.
Returns:
mdPCA: A new instance of the current object.
1006 def fit_transform( 1007 self, 1008 snpobj: Optional['SNPObject'] = None, 1009 laiobj: Optional['LocalAncestryObject'] = None, 1010 labels_file: Optional[str] = None, 1011 ancestry: Optional[str] = None, 1012 average_strands: Optional[bool] = None 1013 ) -> np.ndarray: 1014 """ 1015 Fit the model to the SNP data stored in the provided `snpobj` and apply the dimensionality reduction on the same SNP data. 1016 1017 This method starts by loading or updating SNP and ancestry data. Then, it manages missing values by applying 1018 masks based on ancestry, either by loading a pre-existing mask or generating new ones. After processing these 1019 masks to produce an incomplete SNP data matrix, it applies the chosen mdPCA method to reduce dimensionality 1020 while handling missing data as specified. 1021 1022 Args: 1023 snpobj (SNPObject, optional): 1024 A SNPObject instance. 1025 laiobj (LAIObject, optional): 1026 A LAIObject instance. 1027 labels_file (str, optional): 1028 Path to the labels file in .tsv format. The first column, `indID`, contains the individual identifiers, and the second 1029 column, `label`, specifies the groups for all individuals. If `is_weighted=True`, a `weight` column with individual 1030 weights is required. Optionally, `combination` and `combination_weight` columns can specify sets of individuals to be 1031 combined into groups, with respective weights. 1032 ancestry (str, optional): 1033 Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0. 1034 average_strands (bool, optional): 1035 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 1036 If None, defaults to `self.average_strands`. 1037 1038 Returns: 1039 **array of shape (n_samples, n_components):** 1040 The transformed SNP data projected onto the `n_components` principal components, stored in `self.X_new_`. 1041 """ 1042 if snpobj is None: 1043 snpobj = self.snpobj 1044 if laiobj is None: 1045 laiobj = self.laiobj 1046 if labels_file is None: 1047 labels_file = self.labels_file 1048 if ancestry is None: 1049 ancestry = self.ancestry 1050 if average_strands is None: 1051 average_strands = self.average_strands 1052 1053 if self.load_masks: 1054 # Load precomputed ancestry-based masked genotype matrix, SNP identifiers, haplotype identifiers, and weights 1055 mask, variants_id, haplotypes, _, weights = self._load_mask_file() 1056 else: 1057 # Process genotype data with optional ancestry-based masking and return the corresponding SNP and individual identifiers 1058 mask, variants_id, haplotypes = process_calldata_gt( 1059 self.snpobj, 1060 self.laiobj, 1061 self.ancestry, 1062 self.average_strands, 1063 self.force_nan_incomplete_strands, 1064 self.is_masked, 1065 self.rsid_or_chrompos 1066 ) 1067 1068 # Process individual genomic labels and weights, aligning them with a masked genotype matrix by 1069 # filtering out low-coverage individuals, reordering data to match the matrix structure, and 1070 # handling group-based adjustments 1071 mask, haplotypes, _, weights = process_labels_weights( 1072 self.labels_file, 1073 mask, 1074 variants_id, 1075 haplotypes, 1076 self.average_strands, 1077 self.ancestry, 1078 self.min_percent_snps, 1079 self.group_snp_frequencies_only, 1080 self.groups_to_remove, 1081 self.is_weighted, 1082 self.save_masks, 1083 self.masks_file 1084 ) 1085 1086 # Call run_cov_matrix with the specified method 1087 self.X_new_ = self._run_cov_matrix( 1088 mask[self.ancestry].T, 1089 weights 1090 ) 1091 1092 self.haplotypes_ = haplotypes 1093 self.variants_id_ = variants_id 1094 1095 return self.X_new_
Fit the model to the SNP data stored in the provided snpobj
and apply the dimensionality reduction on the same SNP data.
This method starts by loading or updating SNP and ancestry data. Then, it manages missing values by applying masks based on ancestry, either by loading a pre-existing mask or generating new ones. After processing these masks to produce an incomplete SNP data matrix, it applies the chosen mdPCA method to reduce dimensionality while handling missing data as specified.
Arguments:
- snpobj (SNPObject, optional): A SNPObject instance.
- laiobj (LAIObject, optional): A LAIObject instance.
- labels_file (str, optional): Path to the labels file in .tsv format. The first column,
indID
, contains the individual identifiers, and the second column,label
, specifies the groups for all individuals. Ifis_weighted=True
, aweight
column with individual weights is required. Optionally,combination
andcombination_weight
columns can specify sets of individuals to be combined into groups, with respective weights. - ancestry (str, optional): Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0.
- average_strands (bool, optional): True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
If None, defaults to
self.average_strands
.
Returns:
array of shape (n_samples, n_components): The transformed SNP data projected onto the
n_components
principal components, stored inself.X_new_
.