snputils.processing.mdpca
1import pathlib 2import os 3import time 4import gc 5import logging 6import logging.config 7import numpy as np 8import copy 9from typing import Optional, Dict, List, Union 10from sklearn.decomposition import TruncatedSVD 11 12from snputils.snp.genobj.snpobj import SNPObject 13from snputils.ancestry.genobj.local import LocalAncestryObject 14from ._utils.gen_tools import array_process, process_labels_weights 15from ._utils.iterative_svd import IterativeSVD 16 17 18class mdPCA: 19 """ 20 A class for missing data principal component analysis (mdPCA). 21 22 This class supports both separate and averaged strand processing for SNP data. If the `snpobj`, 23 `laiobj`, `labels_file`, and `ancestry` parameters are all provided during instantiation, 24 the `fit_transform` method will be automatically called, applying the specified mdPCA method to transform 25 the data upon instantiation. 26 """ 27 def __init__( 28 self, 29 method: str = 'weighted_cov_pca', 30 snpobj: Optional['SNPObject'] = None, 31 laiobj: Optional['LocalAncestryObject'] = None, 32 labels_file: Optional[str] = None, 33 ancestry: Optional[str] = None, 34 is_masked: bool = True, 35 prob_thresh: float = 0, 36 average_strands: bool = False, 37 is_weighted: bool = False, 38 groups_to_remove: Dict[int, List[str]] = {}, 39 min_percent_snps: float = 4, 40 save_masks: bool = False, 41 load_masks: bool = False, 42 masks_file: Union[str, pathlib.Path] = 'masks.npz', 43 output_file: Union[str, pathlib.Path] = 'output.tsv', 44 covariance_matrix_file: Optional[str] = None, 45 n_components: int = 2, 46 rsid_or_chrompos: int = 2, 47 percent_vals_masked: float = 0 48 ): 49 """ 50 Args: 51 method (str, default='weighted_cov_pca'): 52 The PCA method to use for dimensionality reduction. Options include: 53 - `'weighted_cov_pca'`: 54 Simple covariance-based PCA, weighted by sample strengths. 55 - `'regularized_optimization_ils'`: 56 Regularized optimization followed by iterative, weighted (via the strengths) least squares projection of 57 missing samples using the original covariance matrix (considering only relevant elements not missing in 58 the original covariance matrix for those samples). 59 - `'cov_matrix_imputation'`: 60 Eigen-decomposition of the covariance matrix after first imputing the covariance matrix missing values 61 using the Iterative SVD imputation method. 62 - `'cov_matrix_imputation_ils'`: 63 The method of 'cov_matrix_imputation', but where afterwards missing samples are re-projected onto the space 64 given by 'cov_matrix_imputation' using the same iterative method on the original covariance matrix just 65 as done in 'regularized_optimization_ils'. 66 - `'nonmissing_pca_ils'`: 67 The method of 'weighted_cov_pca' on the non-missing samples, followed by the projection of missing samples onto 68 the space given by 'weighted_cov_pca' using the same iterative method on the original covariance matrix just as 69 done in 'regularized_optimization_ils'. 70 snpobj (SNPObject, optional): 71 A SNPObject instance. 72 laiobj (LAIObject, optional): 73 A LAIObject instance. 74 labels_file (str, optional): 75 Path to the labels file in .tsv format. The first column, `indID`, contains the individual identifiers, and the second 76 column, `label`, specifies the groups for all individuals. If `is_weighted=True`, a `weight` column with individual 77 weights is required. Optionally, `combination` and `combination_weight` columns can specify sets of individuals to be 78 combined into groups, with respective weights. 79 ancestry (str, optional): 80 Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at `0`. 81 is_masked (bool, default=True): 82 True if an ancestry file is passed for ancestry-specific masking, or False otherwise. 83 prob_thresh (float, default=0): 84 Minimum probability threshold for a SNP to belong to an ancestry. 85 average_strands (bool, default=False): 86 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 87 is_weighted (bool, default=False): 88 True if weights are provided in the labels file, or False otherwise. 89 groups_to_remove (dict of int to list of str, default={}): 90 Dictionary specifying groups to exclude from analysis. Keys are array numbers, and values are 91 lists of groups to remove for each array. 92 Example: `{1: ['group1', 'group2'], 2: [], 3: ['group3']}`. 93 min_percent_snps (float, default=4): 94 Minimum percentage of SNPs that must be known for an individual to be included in the analysis. 95 All individuals with fewer percent of unmasked SNPs than this threshold will be excluded. 96 save_masks (bool, default=False): 97 True if the masked matrices are to be saved in a `.npz` file, or False otherwise. 98 load_masks (bool, default=False): 99 True if the masked matrices are to be loaded from a pre-existing `.npz` file specified by `masks_file`, or False otherwise. 100 masks_file (str or pathlib.Path, default='masks.npz'): 101 Path to the `.npz` file used for saving/loading masked matrices. 102 output_file (str or pathlib.Path, default='output.tsv'): 103 Path to the output `.tsv` file where mdPCA results are saved. 104 covariance_matrix_file (str, optional): 105 Path to save the covariance matrix file in `.npy` format. If None, the covariance matrix is not saved. Default is None. 106 n_components (int, default=2): 107 The number of principal components. 108 rsid_or_chrompos (int, default=2): 109 Format indicator for SNP IDs in the SNP data. Use 1 for `rsID` format or 2 for `chromosome_position`. 110 percent_vals_masked (float, default=0): 111 Percentage of values in the covariance matrix to be masked and then imputed. Only applicable if `method` is 112 `'cov_matrix_imputation'` or `'cov_matrix_imputation_ils'`. 113 """ 114 self.__snpobj = snpobj 115 self.__laiobj = laiobj 116 self.__labels_file = labels_file 117 self.__ancestry = ancestry 118 self.__method = method 119 self.__is_masked = is_masked 120 self.__prob_thresh = prob_thresh 121 self.__average_strands = average_strands 122 self.__is_weighted = is_weighted 123 self.__groups_to_remove = groups_to_remove 124 self.__min_percent_snps = min_percent_snps 125 self.__save_masks = save_masks 126 self.__load_masks = load_masks 127 self.__masks_file = masks_file 128 self.__output_file = output_file 129 self.__covariance_matrix_file = covariance_matrix_file 130 self.__n_components = n_components 131 self.__rsid_or_chrompos = rsid_or_chrompos 132 self.__percent_vals_masked = percent_vals_masked 133 self.__X_new_ = None # Store transformed SNP data 134 self.__haplotypes_ = None # Store haplotypes after filtering if min_percent_snps > 0 135 self.__samples_ = None # Store samples after filtering if min_percent_snps > 0 136 137 # Fit and transform if a `snpobj`, `laiobj`, `labels_file`, and `ancestry` are provided 138 if self.snpobj is not None and self.laiobj is not None and self.labels_file is not None and self.ancestry is not None: 139 self.fit_transform(snpobj, laiobj, labels_file, ancestry) 140 141 def __getitem__(self, key): 142 """ 143 To access an attribute of the class using the square bracket notation, 144 similar to a dictionary. 145 """ 146 try: 147 return getattr(self, key) 148 except AttributeError: 149 raise KeyError(f'Invalid key: {key}') 150 151 def __setitem__(self, key, value): 152 """ 153 To set an attribute of the class using the square bracket notation, 154 similar to a dictionary. 155 """ 156 try: 157 setattr(self, key, value) 158 except AttributeError: 159 raise KeyError(f'Invalid key: {key}') 160 161 @property 162 def method(self) -> str: 163 """ 164 Retrieve `method`. 165 166 Returns: 167 **str:** The PCA method to use for dimensionality reduction. 168 """ 169 return self.__method 170 171 @method.setter 172 def method(self, x: str) -> None: 173 """ 174 Update `method`. 175 """ 176 self.__method = x 177 178 @property 179 def snpobj(self) -> Optional['SNPObject']: 180 """ 181 Retrieve `snpobj`. 182 183 Returns: 184 **SNPObject:** A SNPObject instance. 185 """ 186 return self.__snpobj 187 188 @snpobj.setter 189 def snpobj(self, x: 'SNPObject') -> None: 190 """ 191 Update `snpobj`. 192 """ 193 self.__snpobj = x 194 195 @property 196 def laiobj(self) -> Optional['LocalAncestryObject']: 197 """ 198 Retrieve `laiobj`. 199 200 Returns: 201 **LocalAncestryObject:** A LocalAncestryObject instance. 202 """ 203 return self.__laiobj 204 205 @laiobj.setter 206 def laiobj(self, x: 'LocalAncestryObject') -> None: 207 """ 208 Update `laiobj`. 209 """ 210 self.__laiobj = x 211 212 @property 213 def labels_file(self) -> Optional[str]: 214 """ 215 Retrieve `labels_file`. 216 217 Returns: 218 **str:** Path to the labels file in `.tsv` format. 219 """ 220 return self.__labels_file 221 222 @labels_file.setter 223 def labels_file(self, x: str) -> None: 224 """ 225 Update `labels_file`. 226 """ 227 self.__labels_file = x 228 229 @property 230 def ancestry(self) -> Optional[str]: 231 """ 232 Retrieve `ancestry`. 233 234 Returns: 235 **str:** Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0. 236 """ 237 return self.__ancestry 238 239 @ancestry.setter 240 def ancestry(self, x: str) -> None: 241 """ 242 Update `ancestry`. 243 """ 244 self.__ancestry = x 245 246 @property 247 def is_masked(self) -> bool: 248 """ 249 Retrieve `is_masked`. 250 251 Returns: 252 **bool:** True if an ancestry file is passed for ancestry-specific masking, or False otherwise. 253 """ 254 return self.__is_masked 255 256 @is_masked.setter 257 def is_masked(self, x: bool) -> None: 258 """ 259 Update `is_masked`. 260 """ 261 self.__is_masked = x 262 263 @property 264 def prob_thresh(self) -> float: 265 """ 266 Retrieve `prob_thresh`. 267 268 Returns: 269 **float:** Minimum probability threshold for a SNP to belong to an ancestry. 270 """ 271 return self.__prob_thresh 272 273 @prob_thresh.setter 274 def prob_thresh(self, x: float) -> None: 275 """Update `prob_thresh`. 276 """ 277 self.__prob_thresh = x 278 279 @property 280 def average_strands(self) -> bool: 281 """ 282 Retrieve `average_strands`. 283 284 Returns: 285 **bool:** True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 286 """ 287 return self.__average_strands 288 289 @average_strands.setter 290 def average_strands(self, x: bool) -> None: 291 """ 292 Update `average_strands`. 293 """ 294 self.__average_strands = x 295 296 @property 297 def is_weighted(self) -> bool: 298 """ 299 Retrieve `is_weighted`. 300 301 Returns: 302 **bool:** True if weights are provided in the labels file, or False otherwise. 303 """ 304 return self.__is_weighted 305 306 @is_weighted.setter 307 def is_weighted(self, x: bool) -> None: 308 """ 309 Update `is_weighted`. 310 """ 311 self.__is_weighted = x 312 313 @property 314 def groups_to_remove(self) -> Dict[int, List[str]]: 315 """ 316 Retrieve `groups_to_remove`. 317 318 Returns: 319 **dict of int to list of str:** Dictionary specifying groups to exclude from analysis. Keys are array numbers, and values are 320 lists of groups to remove for each array. Example: `{1: ['group1', 'group2'], 2: [], 3: ['group3']}`. 321 """ 322 return self.__groups_to_remove 323 324 @groups_to_remove.setter 325 def groups_to_remove(self, x: Dict[int, List[str]]) -> None: 326 """ 327 Update `groups_to_remove`. 328 """ 329 self.__groups_to_remove = x 330 331 @property 332 def min_percent_snps(self) -> float: 333 """ 334 Retrieve `min_percent_snps`. 335 336 Returns: 337 **float:** 338 Minimum percentage of SNPs that must be known for an individual to be included in the analysis. 339 All individuals with fewer percent of unmasked SNPs than this threshold will be excluded. 340 """ 341 return self.__min_percent_snps 342 343 @min_percent_snps.setter 344 def min_percent_snps(self, x: float) -> None: 345 """ 346 Update `min_percent_snps`. 347 """ 348 self.__min_percent_snps = x 349 350 @property 351 def save_masks(self) -> bool: 352 """ 353 Retrieve `save_masks`. 354 355 Returns: 356 **bool:** True if the masked matrices are to be saved in a `.npz` file, or False otherwise. 357 """ 358 return self.__save_masks 359 360 @save_masks.setter 361 def save_masks(self, x: bool) -> None: 362 """ 363 Update `save_masks`. 364 """ 365 self.__save_masks = x 366 367 @property 368 def load_masks(self) -> bool: 369 """ 370 Retrieve `load_masks`. 371 372 Returns: 373 **bool:** 374 True if the masked matrices are to be loaded from a pre-existing `.npz` file specified 375 by `masks_file`, or False otherwise. 376 """ 377 return self.__load_masks 378 379 @load_masks.setter 380 def load_masks(self, x: bool) -> None: 381 """ 382 Update `load_masks`. 383 """ 384 self.__load_masks = x 385 386 @property 387 def masks_file(self) -> Union[str, pathlib.Path]: 388 """ 389 Retrieve `masks_file`. 390 391 Returns: 392 **str or pathlib.Path:** Path to the `.npz` file used for saving/loading masked matrices. 393 """ 394 return self.__masks_file 395 396 @masks_file.setter 397 def masks_file(self, x: Union[str, pathlib.Path]) -> None: 398 """ 399 Update `masks_file`. 400 """ 401 self.__masks_file = x 402 403 @property 404 def output_file(self) -> Union[str, pathlib.Path]: 405 """ 406 Retrieve `output_file`. 407 408 Returns: 409 **str or pathlib.Path:** Path to the output `.tsv` file where mdPCA results are saved. 410 """ 411 return self.__output_file 412 413 @output_file.setter 414 def output_file(self, x: Union[str, pathlib.Path]) -> None: 415 """ 416 Update `output_file`. 417 """ 418 self.__output_file = x 419 420 @property 421 def covariance_matrix_file(self) -> Optional[str]: 422 """ 423 Retrieve `covariance_matrix_file`. 424 425 Returns: 426 **str:** Path to save the covariance matrix file in `.npy` format. 427 """ 428 return self.__covariance_matrix_file 429 430 @covariance_matrix_file.setter 431 def covariance_matrix_file(self, x: Optional[str]) -> None: 432 """ 433 Update `covariance_matrix_file`. 434 """ 435 self.__covariance_matrix_file = x 436 437 @property 438 def n_components(self) -> int: 439 """ 440 Retrieve `n_components`. 441 442 Returns: 443 **int:** The number of principal components. 444 """ 445 return self.__n_components 446 447 @n_components.setter 448 def n_components(self, x: int) -> None: 449 """ 450 Update `n_components`. 451 """ 452 self.__n_components = x 453 454 @property 455 def rsid_or_chrompos(self) -> int: 456 """ 457 Retrieve `rsid_or_chrompos`. 458 459 Returns: 460 **int:** Format indicator for SNP IDs in the SNP data. Use 1 for `rsID` format or 2 for `chromosome_position`. 461 """ 462 return self.__rsid_or_chrompos 463 464 @rsid_or_chrompos.setter 465 def rsid_or_chrompos(self, x: int) -> None: 466 """ 467 Update `rsid_or_chrompos`. 468 """ 469 self.__rsid_or_chrompos = x 470 471 @property 472 def percent_vals_masked(self) -> float: 473 """ 474 Retrieve `percent_vals_masked`. 475 476 Returns: 477 **float:** 478 Percentage of values in the covariance matrix to be masked and then imputed. Only applicable if `method` is 479 `'cov_matrix_imputation'` or `'cov_matrix_imputation_ils'`. 480 """ 481 return self.__percent_vals_masked 482 483 @percent_vals_masked.setter 484 def percent_vals_masked(self, x: float) -> None: 485 """ 486 Update `percent_vals_masked`. 487 """ 488 self.__percent_vals_masked = x 489 490 @property 491 def X_new_(self) -> Optional[np.ndarray]: 492 """ 493 Retrieve `X_new_`. 494 495 Returns: 496 **array of shape (n_samples, n_components):** 497 The transformed SNP data projected onto the `n_components` principal components. 498 n_haplotypes_ is the number of haplotypes, potentially reduced if filtering is applied 499 (`min_percent_snps > 0`). For diploid individuals without filtering, the shape is 500 `(n_samples * 2, n_components)`. 501 """ 502 return self.__X_new_ 503 504 @X_new_.setter 505 def X_new_(self, x: np.ndarray) -> None: 506 """ 507 Update `X_new_`. 508 """ 509 self.__X_new_ = x 510 511 @property 512 def haplotypes_(self) -> Optional[List[str]]: 513 """ 514 Retrieve `haplotypes_`. 515 516 Returns: 517 list of str: 518 A list of unique haplotype identifiers. 519 """ 520 if isinstance(self.__haplotypes_, np.ndarray): 521 return self.__haplotypes_.ravel().tolist() # Flatten and convert NumPy array to a list 522 elif isinstance(self.__haplotypes_, list): 523 if len(self.__haplotypes_) == 1 and isinstance(self.__haplotypes_[0], np.ndarray): 524 return self.__haplotypes_[0].ravel().tolist() # Handle list containing a single array 525 return self.__haplotypes_ # Already a flat list 526 elif self.__haplotypes_ is None: 527 return None # If no haplotypes are set 528 else: 529 raise TypeError("`haplotypes_` must be a list or a NumPy array.") 530 531 @haplotypes_.setter 532 def haplotypes_(self, x: Union[np.ndarray, List[str]]) -> None: 533 """ 534 Update `haplotypes_`. 535 """ 536 if isinstance(x, np.ndarray): 537 self.__haplotypes_ = x.ravel().tolist() # Flatten and convert to a list 538 elif isinstance(x, list): 539 if len(x) == 1 and isinstance(x[0], np.ndarray): # Handle list containing a single array 540 self.__haplotypes_ = x[0].ravel().tolist() 541 else: 542 self.__haplotypes_ = x # Use directly if already a list 543 else: 544 raise TypeError("`x` must be a list or a NumPy array.") 545 546 @property 547 def samples_(self) -> Optional[List[str]]: 548 """ 549 Retrieve `samples_`. 550 551 Returns: 552 list of str: 553 A list of sample identifiers based on `haplotypes_` and `average_strands`. 554 """ 555 haplotypes = self.haplotypes_ 556 if haplotypes is None: 557 return None 558 if self.__average_strands: 559 return haplotypes 560 else: 561 return [x[:-2] for x in haplotypes] 562 563 @property 564 def n_haplotypes(self) -> Optional[int]: 565 """ 566 Retrieve `n_haplotypes`. 567 568 Returns: 569 **int:** 570 The total number of haplotypes, potentially reduced if filtering is applied 571 (`min_percent_snps > 0`). 572 """ 573 return len(self.haplotypes_) 574 575 @property 576 def n_samples(self) -> Optional[int]: 577 """ 578 Retrieve `n_samples`. 579 580 Returns: 581 **int:** 582 The total number of samples, potentially reduced if filtering is applied 583 (`min_percent_snps > 0`). 584 """ 585 return len(np.unique(self.samples_)) 586 587 def copy(self) -> 'mdPCA': 588 """ 589 Create and return a copy of `self`. 590 591 Returns: 592 **mdPCA:** 593 A new instance of the current object. 594 """ 595 return copy.copy(self) 596 597 def _process_masks(self, masks, rs_ID_list, ind_ID_list): 598 masked_matrix = masks[0][self.ancestry].T 599 rs_IDs = rs_ID_list[0] 600 ind_IDs = ind_ID_list[0] 601 return masked_matrix, rs_IDs, ind_IDs 602 603 def _load_mask_file(self): 604 mask_files = np.load(self.masks_file, allow_pickle=True) 605 masks = mask_files['masks'] 606 rs_ID_list = mask_files['rs_ID_list'] 607 ind_ID_list = mask_files['ind_ID_list'] 608 labels = mask_files['labels'] 609 weights = mask_files['weights'] 610 return masks, rs_ID_list, ind_ID_list, labels, weights 611 612 @staticmethod 613 def _compute_strength_vector(X): 614 strength_vector = np.sum(~np.isnan(X), axis=1) / X.shape[1] 615 return strength_vector 616 617 @staticmethod 618 def _compute_strength_matrix(X): 619 notmask = (~np.isnan(X)).astype(np.float32) 620 strength_matrix = np.dot(notmask, notmask.T) 621 strength_matrix /= X.shape[1] 622 return strength_matrix 623 624 def _cov(self, x): 625 ddof = 1 626 x = np.ma.array(x, ndmin=2, copy=True, dtype=np.float32) 627 xmask = np.ma.getmaskarray(x) 628 xnotmask = np.logical_not(xmask).astype(np.float32) 629 fact = np.dot(xnotmask, xnotmask.T) * 1. - ddof 630 del xnotmask 631 gc.collect() 632 result = (np.ma.dot(x, x.T, strict=False) / fact).squeeze() 633 x = x.data 634 strength_vec = self._compute_strength_vector(x) 635 strength_mat = self._compute_strength_matrix(x) 636 return result.data, strength_vec, strength_mat 637 638 @staticmethod 639 def _demean(S, w): 640 w_sum = np.matmul(w, S) 641 w_rowsum = w_sum.reshape((1, S.shape[0])) 642 w_colsum = w_sum.reshape((S.shape[0], 1)) 643 w_totalsum = np.dot(w_sum, w) 644 S -= (w_rowsum + w_colsum) - w_totalsum 645 return S 646 647 @staticmethod 648 def _iterative_svd_matrix_completion(cov): 649 num_masked = np.isnan(cov).sum() 650 if num_masked > 0: 651 start_time = time.time() 652 start_rank = 1 653 end_rank = 10 654 rank = 5 655 choose_best = True 656 num_cores = 5 657 cov_complete = IterativeSVD(start_rank=start_rank, end_rank=end_rank, rank=rank, choose_best=choose_best, num_cores=num_cores).fit_transform(cov) 658 logging.info("Iterative SVD --- %s seconds ---" % (time.time() - start_time)) 659 else: 660 cov_complete = cov 661 return cov_complete 662 663 def _weighted_cov_pca(self, X_incomplete, weights, covariance_matrix_file, n_components): 664 start_time = time.time() 665 X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete)) 666 S, _, _ = self._cov(X_incomplete) 667 logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time)) 668 weights_normalized = weights / weights.sum() 669 S = self._demean(S, weights_normalized) 670 W = np.diag(weights) 671 WSW = np.matmul(np.matmul(np.sqrt(W), S), np.sqrt(W)) 672 if covariance_matrix_file: 673 np.save(covariance_matrix_file, S.data) 674 svd = TruncatedSVD(n_components, algorithm="arpack") 675 svd.fit(WSW) 676 U = np.matmul(svd.components_, np.linalg.inv(np.sqrt(W))).T 677 D = np.diag(np.sqrt(svd.singular_values_)) 678 T = np.matmul(U, D) 679 total_var = np.trace(S) 680 for i in range(n_components): 681 pc_percentvar = 100 * svd.singular_values_[i] / total_var 682 logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar) 683 return T 684 685 @staticmethod 686 def _create_validation_mask(X_incomplete, percent_inds): 687 np.random.seed(1) 688 masked_rows = np.isnan(X_incomplete).any(axis=1) 689 masked_inds = np.flatnonzero(masked_rows) 690 X_masked = X_incomplete[masked_rows] 691 percent_masked = 100 * np.isnan(X_masked).sum() / (X_masked.shape[0] * X_masked.shape[1]) 692 unmasked_rows = ~masked_rows 693 X_unmasked = X_incomplete[unmasked_rows] 694 masked_rows = np.random.choice(range(X_unmasked.shape[0]), size=int(X_unmasked.shape[0] * percent_inds / 100), replace=False) 695 X_masked_rows = X_unmasked[masked_rows,:] 696 mask = np.zeros(X_masked_rows.shape[0] * X_masked_rows.shape[1], dtype=np.int8) 697 mask[:int(X_masked_rows.shape[0] * X_masked_rows.shape[1] * percent_masked / 100)] = 1 698 np.random.shuffle(mask) 699 mask = mask.astype(bool) 700 mask = mask.reshape(X_masked_rows.shape) 701 X_masked_rows[mask] = np.nan 702 X_unmasked[masked_rows] = X_masked_rows 703 X_incomplete[unmasked_rows] = X_unmasked 704 masked_rows_new = np.isnan(X_incomplete).any(axis=1) 705 masked_inds_new = np.flatnonzero(masked_rows_new) 706 masked_inds_val = sorted(list(set(masked_inds_new) - set(masked_inds))) 707 return X_incomplete, masked_inds_val 708 709 @staticmethod 710 def _create_cov_mask(cov, strength_mat, percent_vals_masked): 711 cov_flattened = cov.reshape(-1).copy() 712 num_vals_masked = int(percent_vals_masked * len(cov_flattened) / 100) 713 pos_masked = strength_mat.reshape(-1).argsort()[:num_vals_masked] 714 cov_flattened[pos_masked] = np.nan 715 cov_masked = cov_flattened.reshape(cov.shape) 716 return cov_masked 717 718 def _regularized_optimization_ils(self, X_incomplete, covariance_matrix_file, n_components): 719 def run_cov_matrix_regularized_optimization_ils(X_incomplete, covariance_matrix_file): 720 start_time = time.time() 721 X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete)) 722 has_missing = np.isnan(X_incomplete.data).sum() > 0 723 S, _, strength_mat = self._cov(X_incomplete) 724 logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time)) 725 S = self._demean(S, np.ones(S.shape[0]) / S.shape[0]) 726 if has_missing: 727 logging.info("Starting matrix completion. This will take a few minutes...") 728 start_time = time.time() 729 percent_inds_val = 20 # Percent of unmasked individuals to be masked for cross-validation 730 X_incomplete, masked_inds_val = self._create_validation_mask(X_incomplete.data, percent_inds_val) # masked_inds_val is the list of indices of the individuals masked for validation 731 X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete)) 732 S_prime, _, w_mat_prime = self._cov(X_incomplete) 733 del X_incomplete 734 gc.collect() 735 S_prime = self._demean(S_prime, np.ones(S_prime.shape[0]) / S_prime.shape[0]) 736 S_robust, lam = self.matrix_completion(S, strength_mat, S_prime, w_mat_prime, lams=None, method="NN", 737 cv_inds=masked_inds_val) 738 logging.info(f"Covariance Matrix --- %{time.time() - start_time:.2}s seconds ---") 739 else: 740 S_robust = S.copy() 741 if covariance_matrix_file: 742 np.save(covariance_matrix_file, S.data) 743 if has_missing: 744 base, ext = os.path.splitext(covariance_matrix_file) 745 np.save(f"{base}_completed_{lam}{ext}", S_robust.data) 746 return S, S_robust 747 748 def _compute_projection_regularized_optimization_ils(X_incomplete, S, S_robust, n_components): 749 _, _, strength_mat = self._cov(np.ma.array(X_incomplete, mask=np.isnan(X_incomplete))) 750 nonmissing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) == 0) 751 missing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) > 0) 752 missing_sorted = sorted(missing, key=lambda x: (~np.isnan(X_incomplete[x])).sum(), reverse=True) 753 svd = TruncatedSVD(n_components, algorithm="arpack") 754 svd.fit(S_robust) 755 U = svd.components_.T 756 U_nonmissing = U[nonmissing] 757 D_nonmissing = np.diag(np.sqrt(svd.singular_values_)) 758 T = np.zeros((X_incomplete.shape[0], n_components)) 759 T_filled = np.matmul(U_nonmissing, D_nonmissing) 760 i_filled = nonmissing 761 T[nonmissing] = T_filled 762 for i in missing_sorted: 763 S_i = S[i][i_filled] 764 W_i = np.diag(strength_mat[i][i_filled]) 765 A = np.matmul(W_i, T_filled) 766 b = np.matmul(W_i, S_i) 767 t = np.linalg.lstsq(A, b, rcond=None)[0] 768 T[i] = t 769 i_filled = np.append(i_filled, i) 770 T_filled = np.append(T_filled, [t], axis=0) 771 total_var = np.trace(S) 772 for i in range(n_components): 773 pc_percentvar = 100 * svd.singular_values_[i] / total_var 774 logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar) 775 return T 776 777 S, S_robust = run_cov_matrix_regularized_optimization_ils(X_incomplete, covariance_matrix_file) 778 T = _compute_projection_regularized_optimization_ils(X_incomplete, S, S_robust, n_components) 779 return T 780 781 def _cov_matrix_imputation(self, X_incomplete, percent_vals_masked, covariance_matrix_file, n_components): 782 start_time = time.time() 783 X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete)) 784 S, strength_vec, strength_mat = self._cov(X_incomplete) 785 logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time)) 786 S_masked = self._create_cov_mask(S, strength_mat, percent_vals_masked) 787 S = self._iterative_svd_matrix_completion(S_masked) 788 weights = np.ones(len(strength_vec)) 789 weights_normalized = weights / weights.sum() 790 S = self._demean(S, weights_normalized) 791 W = np.diag(weights) 792 WSW = np.matmul(np.matmul(np.sqrt(W), S), np.sqrt(W)) 793 if covariance_matrix_file: 794 np.save(covariance_matrix_file, S.data) 795 svd = TruncatedSVD(n_components, algorithm="arpack") 796 svd.fit(WSW) 797 U = np.matmul(svd.components_, np.linalg.inv(np.sqrt(W))).T 798 D = np.diag(np.sqrt(svd.singular_values_)) 799 T = np.matmul(U, D) 800 total_var = np.trace(S) 801 for i in range(n_components): 802 pc_percentvar = 100 * svd.singular_values_[i] / total_var 803 logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar) 804 return T 805 806 def _cov_matrix_imputation_ils(self, X_incomplete, percent_vals_masked, covariance_matrix_file, n_components): 807 808 def run_cov_matrix_cov_matrix_imputation_ils(X_incomplete, covariance_matrix_file): 809 start_time = time.time() 810 X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete)) 811 S, strength_vec, strength_mat = self._cov(X_incomplete) 812 logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time)) 813 S_masked = self._create_cov_mask(S, strength_mat, percent_vals_masked) 814 S = self._iterative_svd_matrix_completion(S_masked) 815 weights = np.ones(len(strength_vec)) 816 weights_normalized = weights / weights.sum() 817 S = self._demean(S, weights_normalized) 818 W = np.diag(weights) 819 if covariance_matrix_file: 820 np.save(covariance_matrix_file, S.data) 821 return S, W 822 823 def compute_projection_cov_matrix_imputation_ils(X_incomplete, S, W, n_components): 824 S, _, strength_mat = self._cov(np.ma.array(X_incomplete, mask=np.isnan(X_incomplete))) 825 S = self._demean(S, np.ones(S.shape[0]) / S.shape[0]) 826 nonmissing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) == 0) 827 missing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) > 0) 828 missing_sorted = sorted(missing, key=lambda x: (~np.isnan(X_incomplete[x])).sum(), reverse=True) 829 svd = TruncatedSVD(n_components, algorithm="arpack") 830 WSW = np.matmul(np.matmul(np.sqrt(W), S), np.sqrt(W)) 831 svd.fit(WSW) 832 U = np.matmul(svd.components_, np.linalg.inv(np.sqrt(W))).T 833 U_nonmissing = U[nonmissing] 834 D_nonmissing = np.diag(np.sqrt(svd.singular_values_)) 835 T = np.zeros((X_incomplete.shape[0], n_components)) 836 T_filled = np.matmul(U_nonmissing, D_nonmissing) 837 i_filled = nonmissing 838 T[nonmissing] = T_filled 839 for i in missing_sorted: 840 S_i = S[i][i_filled] 841 W_i = np.diag(strength_mat[i][i_filled]) 842 A = np.matmul(W_i, T_filled) 843 b = np.matmul(W_i, S_i) 844 t = np.linalg.lstsq(A, b, rcond=None)[0] 845 T[i] = t 846 i_filled = np.append(i_filled, i) 847 T_filled = np.append(T_filled, [t], axis=0) 848 total_var = np.trace(S) 849 for i in range(n_components): 850 pc_percentvar = 100 * svd.singular_values_[i] / total_var 851 logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar) 852 return T 853 854 S, W = run_cov_matrix_cov_matrix_imputation_ils(X_incomplete, covariance_matrix_file) 855 T = compute_projection_cov_matrix_imputation_ils(X_incomplete, S, W, n_components) 856 return T 857 858 def _nonmissing_pca_ils(self, X_incomplete, covariance_matrix_file, n_components): 859 860 def run_cov_matrix_nonmissing_pca_ils(X_incomplete, covariance_matrix_file): 861 start_time = time.time() 862 nonmissing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) == 0) 863 X_incomplete_nonmissing = X_incomplete[nonmissing] 864 X_incomplete_nonmissing = np.ma.array(X_incomplete_nonmissing, mask=np.isnan(X_incomplete_nonmissing)) 865 S_nonmissing, _, _ = self._cov(X_incomplete_nonmissing) 866 logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time)) 867 S_nonmissing = self._demean(S_nonmissing, np.ones(S_nonmissing.shape[0]) / S_nonmissing.shape[0]) 868 if covariance_matrix_file: 869 np.save(covariance_matrix_file, S_nonmissing.data) 870 return S_nonmissing 871 872 def compute_projection_nonmissing_pca_ils(X_incomplete, S_nonmissing, n_components): 873 S, _, strength_mat = self._cov(np.ma.array(X_incomplete, mask=np.isnan(X_incomplete))) 874 S = self._demean(S, np.ones(S.shape[0]) / S.shape[0]) 875 nonmissing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) == 0) 876 missing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) > 0) 877 missing_sorted = sorted(missing, key=lambda x: (~np.isnan(X_incomplete[x])).sum(), reverse=True) 878 svd = TruncatedSVD(n_components, algorithm="arpack") 879 svd.fit(S_nonmissing) 880 U_nonmissing = svd.components_.T 881 D_nonmissing = np.diag(np.sqrt(svd.singular_values_)) 882 T = np.zeros((X_incomplete.shape[0], n_components)) 883 T_filled = np.matmul(U_nonmissing, D_nonmissing) 884 i_filled = nonmissing 885 T[nonmissing] = T_filled 886 for i in missing_sorted: 887 S_i = S[i][i_filled] 888 W_i = np.diag(strength_mat[i][i_filled]) 889 A = np.matmul(W_i, T_filled) 890 b = np.matmul(W_i, S_i) 891 t = np.linalg.lstsq(A, b, rcond=None)[0] 892 T[i] = t 893 i_filled = np.append(i_filled, i) 894 T_filled = np.append(T_filled, [t], axis=0) 895 total_var = np.trace(S) 896 for i in range(n_components): 897 pc_percentvar = 100 * svd.singular_values_[i] / total_var 898 logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar) 899 return T 900 901 S_nonmissing = run_cov_matrix_nonmissing_pca_ils(X_incomplete, covariance_matrix_file) 902 T = compute_projection_nonmissing_pca_ils(X_incomplete, S_nonmissing, n_components) 903 return T 904 905 def _run_cov_matrix(self, X_incomplete, weights): 906 """ 907 Runs the specified PCA method on the incomplete data. 908 """ 909 if self.method == "weighted_cov_pca": 910 return self._weighted_cov_pca(X_incomplete, weights, self.covariance_matrix_file, self.n_components) 911 elif self.method == "regularized_optimization_ils": 912 return self._regularized_optimization_ils(X_incomplete, self.covariance_matrix_file, self.n_components) 913 elif self.method == "cov_matrix_imputation": 914 return self._cov_matrix_imputation(X_incomplete, self.percent_vals_masked, self.covariance_matrix_file, self.n_components) 915 elif self.method == "cov_matrix_imputation_ils": 916 return self._cov_matrix_imputation_ils(X_incomplete, self.percent_vals_masked, self.covariance_matrix_file, self.n_components) 917 elif self.method == "nonmissing_pca_ils": 918 return self._nonmissing_pca_ils(X_incomplete, self.covariance_matrix_file, self.n_components) 919 else: 920 raise ValueError(f"Unsupported method: {self.method}") 921 922 def fit_transform( 923 self, 924 snpobj: Optional['SNPObject'] = None, 925 laiobj: Optional['LocalAncestryObject'] = None, 926 labels_file: Optional[str] = None, 927 ancestry: Optional[str] = None, 928 average_strands: Optional[bool] = None 929 ) -> np.ndarray: 930 """ 931 Fit the model to the SNP data stored in the provided `snpobj` and apply the dimensionality reduction on the same SNP data. 932 933 This method starts by loading or updating SNP and ancestry data. Then, it manages missing values by applying 934 masks based on ancestry, either by loading a pre-existing mask or generating new ones. After processing these 935 masks to produce an incomplete SNP data matrix, it applies the chosen mdPCA method to reduce dimensionality 936 while handling missing data as specified. 937 938 Args: 939 snpobj (SNPObject, optional): 940 A SNPObject instance. 941 laiobj (LAIObject, optional): 942 A LAIObject instance. 943 labels_file (str, optional): 944 Path to the labels file in .tsv format. The first column, `indID`, contains the individual identifiers, and the second 945 column, `label`, specifies the groups for all individuals. If `is_weighted=True`, a `weight` column with individual 946 weights is required. Optionally, `combination` and `combination_weight` columns can specify sets of individuals to be 947 combined into groups, with respective weights. 948 ancestry (str, optional): 949 Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0. 950 average_strands (bool, optional): 951 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 952 If None, defaults to `self.average_strands`. 953 954 Returns: 955 **array of shape (n_samples, n_components):** 956 The transformed SNP data projected onto the `n_components` principal components, stored in `self.X_new_`. 957 """ 958 if snpobj is None: 959 snpobj = self.snpobj 960 if laiobj is None: 961 laiobj = self.laiobj 962 if labels_file is None: 963 labels_file = self.labels_file 964 if ancestry is None: 965 ancestry = self.ancestry 966 if average_strands is None: 967 average_strands = self.average_strands 968 969 if self.load_masks: 970 masks, rs_id_list, ind_id_list, _, weights = self._load_mask_file() 971 else: 972 masks, rs_id_list, ind_id_list = array_process( 973 self.snpobj, 974 self.laiobj, 975 self.average_strands, 976 self.prob_thresh, 977 self.is_masked, 978 self.rsid_or_chrompos 979 ) 980 981 masks, ind_id_list, _, weights = process_labels_weights( 982 self.labels_file, 983 masks, 984 rs_id_list, 985 ind_id_list, 986 self.average_strands, 987 self.ancestry, 988 self.min_percent_snps, 989 self.groups_to_remove, 990 self.is_weighted, 991 self.save_masks, 992 self.masks_file 993 ) 994 995 X_incomplete, _, _ = self._process_masks(masks, rs_id_list, ind_id_list) 996 997 # Call run_cov_matrix with the specified method 998 self.X_new_ = self._run_cov_matrix( 999 X_incomplete, 1000 weights 1001 ) 1002 1003 self.haplotypes_ = ind_id_list 1004 1005 return self.X_new_
19class mdPCA: 20 """ 21 A class for missing data principal component analysis (mdPCA). 22 23 This class supports both separate and averaged strand processing for SNP data. If the `snpobj`, 24 `laiobj`, `labels_file`, and `ancestry` parameters are all provided during instantiation, 25 the `fit_transform` method will be automatically called, applying the specified mdPCA method to transform 26 the data upon instantiation. 27 """ 28 def __init__( 29 self, 30 method: str = 'weighted_cov_pca', 31 snpobj: Optional['SNPObject'] = None, 32 laiobj: Optional['LocalAncestryObject'] = None, 33 labels_file: Optional[str] = None, 34 ancestry: Optional[str] = None, 35 is_masked: bool = True, 36 prob_thresh: float = 0, 37 average_strands: bool = False, 38 is_weighted: bool = False, 39 groups_to_remove: Dict[int, List[str]] = {}, 40 min_percent_snps: float = 4, 41 save_masks: bool = False, 42 load_masks: bool = False, 43 masks_file: Union[str, pathlib.Path] = 'masks.npz', 44 output_file: Union[str, pathlib.Path] = 'output.tsv', 45 covariance_matrix_file: Optional[str] = None, 46 n_components: int = 2, 47 rsid_or_chrompos: int = 2, 48 percent_vals_masked: float = 0 49 ): 50 """ 51 Args: 52 method (str, default='weighted_cov_pca'): 53 The PCA method to use for dimensionality reduction. Options include: 54 - `'weighted_cov_pca'`: 55 Simple covariance-based PCA, weighted by sample strengths. 56 - `'regularized_optimization_ils'`: 57 Regularized optimization followed by iterative, weighted (via the strengths) least squares projection of 58 missing samples using the original covariance matrix (considering only relevant elements not missing in 59 the original covariance matrix for those samples). 60 - `'cov_matrix_imputation'`: 61 Eigen-decomposition of the covariance matrix after first imputing the covariance matrix missing values 62 using the Iterative SVD imputation method. 63 - `'cov_matrix_imputation_ils'`: 64 The method of 'cov_matrix_imputation', but where afterwards missing samples are re-projected onto the space 65 given by 'cov_matrix_imputation' using the same iterative method on the original covariance matrix just 66 as done in 'regularized_optimization_ils'. 67 - `'nonmissing_pca_ils'`: 68 The method of 'weighted_cov_pca' on the non-missing samples, followed by the projection of missing samples onto 69 the space given by 'weighted_cov_pca' using the same iterative method on the original covariance matrix just as 70 done in 'regularized_optimization_ils'. 71 snpobj (SNPObject, optional): 72 A SNPObject instance. 73 laiobj (LAIObject, optional): 74 A LAIObject instance. 75 labels_file (str, optional): 76 Path to the labels file in .tsv format. The first column, `indID`, contains the individual identifiers, and the second 77 column, `label`, specifies the groups for all individuals. If `is_weighted=True`, a `weight` column with individual 78 weights is required. Optionally, `combination` and `combination_weight` columns can specify sets of individuals to be 79 combined into groups, with respective weights. 80 ancestry (str, optional): 81 Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at `0`. 82 is_masked (bool, default=True): 83 True if an ancestry file is passed for ancestry-specific masking, or False otherwise. 84 prob_thresh (float, default=0): 85 Minimum probability threshold for a SNP to belong to an ancestry. 86 average_strands (bool, default=False): 87 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 88 is_weighted (bool, default=False): 89 True if weights are provided in the labels file, or False otherwise. 90 groups_to_remove (dict of int to list of str, default={}): 91 Dictionary specifying groups to exclude from analysis. Keys are array numbers, and values are 92 lists of groups to remove for each array. 93 Example: `{1: ['group1', 'group2'], 2: [], 3: ['group3']}`. 94 min_percent_snps (float, default=4): 95 Minimum percentage of SNPs that must be known for an individual to be included in the analysis. 96 All individuals with fewer percent of unmasked SNPs than this threshold will be excluded. 97 save_masks (bool, default=False): 98 True if the masked matrices are to be saved in a `.npz` file, or False otherwise. 99 load_masks (bool, default=False): 100 True if the masked matrices are to be loaded from a pre-existing `.npz` file specified by `masks_file`, or False otherwise. 101 masks_file (str or pathlib.Path, default='masks.npz'): 102 Path to the `.npz` file used for saving/loading masked matrices. 103 output_file (str or pathlib.Path, default='output.tsv'): 104 Path to the output `.tsv` file where mdPCA results are saved. 105 covariance_matrix_file (str, optional): 106 Path to save the covariance matrix file in `.npy` format. If None, the covariance matrix is not saved. Default is None. 107 n_components (int, default=2): 108 The number of principal components. 109 rsid_or_chrompos (int, default=2): 110 Format indicator for SNP IDs in the SNP data. Use 1 for `rsID` format or 2 for `chromosome_position`. 111 percent_vals_masked (float, default=0): 112 Percentage of values in the covariance matrix to be masked and then imputed. Only applicable if `method` is 113 `'cov_matrix_imputation'` or `'cov_matrix_imputation_ils'`. 114 """ 115 self.__snpobj = snpobj 116 self.__laiobj = laiobj 117 self.__labels_file = labels_file 118 self.__ancestry = ancestry 119 self.__method = method 120 self.__is_masked = is_masked 121 self.__prob_thresh = prob_thresh 122 self.__average_strands = average_strands 123 self.__is_weighted = is_weighted 124 self.__groups_to_remove = groups_to_remove 125 self.__min_percent_snps = min_percent_snps 126 self.__save_masks = save_masks 127 self.__load_masks = load_masks 128 self.__masks_file = masks_file 129 self.__output_file = output_file 130 self.__covariance_matrix_file = covariance_matrix_file 131 self.__n_components = n_components 132 self.__rsid_or_chrompos = rsid_or_chrompos 133 self.__percent_vals_masked = percent_vals_masked 134 self.__X_new_ = None # Store transformed SNP data 135 self.__haplotypes_ = None # Store haplotypes after filtering if min_percent_snps > 0 136 self.__samples_ = None # Store samples after filtering if min_percent_snps > 0 137 138 # Fit and transform if a `snpobj`, `laiobj`, `labels_file`, and `ancestry` are provided 139 if self.snpobj is not None and self.laiobj is not None and self.labels_file is not None and self.ancestry is not None: 140 self.fit_transform(snpobj, laiobj, labels_file, ancestry) 141 142 def __getitem__(self, key): 143 """ 144 To access an attribute of the class using the square bracket notation, 145 similar to a dictionary. 146 """ 147 try: 148 return getattr(self, key) 149 except AttributeError: 150 raise KeyError(f'Invalid key: {key}') 151 152 def __setitem__(self, key, value): 153 """ 154 To set an attribute of the class using the square bracket notation, 155 similar to a dictionary. 156 """ 157 try: 158 setattr(self, key, value) 159 except AttributeError: 160 raise KeyError(f'Invalid key: {key}') 161 162 @property 163 def method(self) -> str: 164 """ 165 Retrieve `method`. 166 167 Returns: 168 **str:** The PCA method to use for dimensionality reduction. 169 """ 170 return self.__method 171 172 @method.setter 173 def method(self, x: str) -> None: 174 """ 175 Update `method`. 176 """ 177 self.__method = x 178 179 @property 180 def snpobj(self) -> Optional['SNPObject']: 181 """ 182 Retrieve `snpobj`. 183 184 Returns: 185 **SNPObject:** A SNPObject instance. 186 """ 187 return self.__snpobj 188 189 @snpobj.setter 190 def snpobj(self, x: 'SNPObject') -> None: 191 """ 192 Update `snpobj`. 193 """ 194 self.__snpobj = x 195 196 @property 197 def laiobj(self) -> Optional['LocalAncestryObject']: 198 """ 199 Retrieve `laiobj`. 200 201 Returns: 202 **LocalAncestryObject:** A LocalAncestryObject instance. 203 """ 204 return self.__laiobj 205 206 @laiobj.setter 207 def laiobj(self, x: 'LocalAncestryObject') -> None: 208 """ 209 Update `laiobj`. 210 """ 211 self.__laiobj = x 212 213 @property 214 def labels_file(self) -> Optional[str]: 215 """ 216 Retrieve `labels_file`. 217 218 Returns: 219 **str:** Path to the labels file in `.tsv` format. 220 """ 221 return self.__labels_file 222 223 @labels_file.setter 224 def labels_file(self, x: str) -> None: 225 """ 226 Update `labels_file`. 227 """ 228 self.__labels_file = x 229 230 @property 231 def ancestry(self) -> Optional[str]: 232 """ 233 Retrieve `ancestry`. 234 235 Returns: 236 **str:** Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0. 237 """ 238 return self.__ancestry 239 240 @ancestry.setter 241 def ancestry(self, x: str) -> None: 242 """ 243 Update `ancestry`. 244 """ 245 self.__ancestry = x 246 247 @property 248 def is_masked(self) -> bool: 249 """ 250 Retrieve `is_masked`. 251 252 Returns: 253 **bool:** True if an ancestry file is passed for ancestry-specific masking, or False otherwise. 254 """ 255 return self.__is_masked 256 257 @is_masked.setter 258 def is_masked(self, x: bool) -> None: 259 """ 260 Update `is_masked`. 261 """ 262 self.__is_masked = x 263 264 @property 265 def prob_thresh(self) -> float: 266 """ 267 Retrieve `prob_thresh`. 268 269 Returns: 270 **float:** Minimum probability threshold for a SNP to belong to an ancestry. 271 """ 272 return self.__prob_thresh 273 274 @prob_thresh.setter 275 def prob_thresh(self, x: float) -> None: 276 """Update `prob_thresh`. 277 """ 278 self.__prob_thresh = x 279 280 @property 281 def average_strands(self) -> bool: 282 """ 283 Retrieve `average_strands`. 284 285 Returns: 286 **bool:** True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 287 """ 288 return self.__average_strands 289 290 @average_strands.setter 291 def average_strands(self, x: bool) -> None: 292 """ 293 Update `average_strands`. 294 """ 295 self.__average_strands = x 296 297 @property 298 def is_weighted(self) -> bool: 299 """ 300 Retrieve `is_weighted`. 301 302 Returns: 303 **bool:** True if weights are provided in the labels file, or False otherwise. 304 """ 305 return self.__is_weighted 306 307 @is_weighted.setter 308 def is_weighted(self, x: bool) -> None: 309 """ 310 Update `is_weighted`. 311 """ 312 self.__is_weighted = x 313 314 @property 315 def groups_to_remove(self) -> Dict[int, List[str]]: 316 """ 317 Retrieve `groups_to_remove`. 318 319 Returns: 320 **dict of int to list of str:** Dictionary specifying groups to exclude from analysis. Keys are array numbers, and values are 321 lists of groups to remove for each array. Example: `{1: ['group1', 'group2'], 2: [], 3: ['group3']}`. 322 """ 323 return self.__groups_to_remove 324 325 @groups_to_remove.setter 326 def groups_to_remove(self, x: Dict[int, List[str]]) -> None: 327 """ 328 Update `groups_to_remove`. 329 """ 330 self.__groups_to_remove = x 331 332 @property 333 def min_percent_snps(self) -> float: 334 """ 335 Retrieve `min_percent_snps`. 336 337 Returns: 338 **float:** 339 Minimum percentage of SNPs that must be known for an individual to be included in the analysis. 340 All individuals with fewer percent of unmasked SNPs than this threshold will be excluded. 341 """ 342 return self.__min_percent_snps 343 344 @min_percent_snps.setter 345 def min_percent_snps(self, x: float) -> None: 346 """ 347 Update `min_percent_snps`. 348 """ 349 self.__min_percent_snps = x 350 351 @property 352 def save_masks(self) -> bool: 353 """ 354 Retrieve `save_masks`. 355 356 Returns: 357 **bool:** True if the masked matrices are to be saved in a `.npz` file, or False otherwise. 358 """ 359 return self.__save_masks 360 361 @save_masks.setter 362 def save_masks(self, x: bool) -> None: 363 """ 364 Update `save_masks`. 365 """ 366 self.__save_masks = x 367 368 @property 369 def load_masks(self) -> bool: 370 """ 371 Retrieve `load_masks`. 372 373 Returns: 374 **bool:** 375 True if the masked matrices are to be loaded from a pre-existing `.npz` file specified 376 by `masks_file`, or False otherwise. 377 """ 378 return self.__load_masks 379 380 @load_masks.setter 381 def load_masks(self, x: bool) -> None: 382 """ 383 Update `load_masks`. 384 """ 385 self.__load_masks = x 386 387 @property 388 def masks_file(self) -> Union[str, pathlib.Path]: 389 """ 390 Retrieve `masks_file`. 391 392 Returns: 393 **str or pathlib.Path:** Path to the `.npz` file used for saving/loading masked matrices. 394 """ 395 return self.__masks_file 396 397 @masks_file.setter 398 def masks_file(self, x: Union[str, pathlib.Path]) -> None: 399 """ 400 Update `masks_file`. 401 """ 402 self.__masks_file = x 403 404 @property 405 def output_file(self) -> Union[str, pathlib.Path]: 406 """ 407 Retrieve `output_file`. 408 409 Returns: 410 **str or pathlib.Path:** Path to the output `.tsv` file where mdPCA results are saved. 411 """ 412 return self.__output_file 413 414 @output_file.setter 415 def output_file(self, x: Union[str, pathlib.Path]) -> None: 416 """ 417 Update `output_file`. 418 """ 419 self.__output_file = x 420 421 @property 422 def covariance_matrix_file(self) -> Optional[str]: 423 """ 424 Retrieve `covariance_matrix_file`. 425 426 Returns: 427 **str:** Path to save the covariance matrix file in `.npy` format. 428 """ 429 return self.__covariance_matrix_file 430 431 @covariance_matrix_file.setter 432 def covariance_matrix_file(self, x: Optional[str]) -> None: 433 """ 434 Update `covariance_matrix_file`. 435 """ 436 self.__covariance_matrix_file = x 437 438 @property 439 def n_components(self) -> int: 440 """ 441 Retrieve `n_components`. 442 443 Returns: 444 **int:** The number of principal components. 445 """ 446 return self.__n_components 447 448 @n_components.setter 449 def n_components(self, x: int) -> None: 450 """ 451 Update `n_components`. 452 """ 453 self.__n_components = x 454 455 @property 456 def rsid_or_chrompos(self) -> int: 457 """ 458 Retrieve `rsid_or_chrompos`. 459 460 Returns: 461 **int:** Format indicator for SNP IDs in the SNP data. Use 1 for `rsID` format or 2 for `chromosome_position`. 462 """ 463 return self.__rsid_or_chrompos 464 465 @rsid_or_chrompos.setter 466 def rsid_or_chrompos(self, x: int) -> None: 467 """ 468 Update `rsid_or_chrompos`. 469 """ 470 self.__rsid_or_chrompos = x 471 472 @property 473 def percent_vals_masked(self) -> float: 474 """ 475 Retrieve `percent_vals_masked`. 476 477 Returns: 478 **float:** 479 Percentage of values in the covariance matrix to be masked and then imputed. Only applicable if `method` is 480 `'cov_matrix_imputation'` or `'cov_matrix_imputation_ils'`. 481 """ 482 return self.__percent_vals_masked 483 484 @percent_vals_masked.setter 485 def percent_vals_masked(self, x: float) -> None: 486 """ 487 Update `percent_vals_masked`. 488 """ 489 self.__percent_vals_masked = x 490 491 @property 492 def X_new_(self) -> Optional[np.ndarray]: 493 """ 494 Retrieve `X_new_`. 495 496 Returns: 497 **array of shape (n_samples, n_components):** 498 The transformed SNP data projected onto the `n_components` principal components. 499 n_haplotypes_ is the number of haplotypes, potentially reduced if filtering is applied 500 (`min_percent_snps > 0`). For diploid individuals without filtering, the shape is 501 `(n_samples * 2, n_components)`. 502 """ 503 return self.__X_new_ 504 505 @X_new_.setter 506 def X_new_(self, x: np.ndarray) -> None: 507 """ 508 Update `X_new_`. 509 """ 510 self.__X_new_ = x 511 512 @property 513 def haplotypes_(self) -> Optional[List[str]]: 514 """ 515 Retrieve `haplotypes_`. 516 517 Returns: 518 list of str: 519 A list of unique haplotype identifiers. 520 """ 521 if isinstance(self.__haplotypes_, np.ndarray): 522 return self.__haplotypes_.ravel().tolist() # Flatten and convert NumPy array to a list 523 elif isinstance(self.__haplotypes_, list): 524 if len(self.__haplotypes_) == 1 and isinstance(self.__haplotypes_[0], np.ndarray): 525 return self.__haplotypes_[0].ravel().tolist() # Handle list containing a single array 526 return self.__haplotypes_ # Already a flat list 527 elif self.__haplotypes_ is None: 528 return None # If no haplotypes are set 529 else: 530 raise TypeError("`haplotypes_` must be a list or a NumPy array.") 531 532 @haplotypes_.setter 533 def haplotypes_(self, x: Union[np.ndarray, List[str]]) -> None: 534 """ 535 Update `haplotypes_`. 536 """ 537 if isinstance(x, np.ndarray): 538 self.__haplotypes_ = x.ravel().tolist() # Flatten and convert to a list 539 elif isinstance(x, list): 540 if len(x) == 1 and isinstance(x[0], np.ndarray): # Handle list containing a single array 541 self.__haplotypes_ = x[0].ravel().tolist() 542 else: 543 self.__haplotypes_ = x # Use directly if already a list 544 else: 545 raise TypeError("`x` must be a list or a NumPy array.") 546 547 @property 548 def samples_(self) -> Optional[List[str]]: 549 """ 550 Retrieve `samples_`. 551 552 Returns: 553 list of str: 554 A list of sample identifiers based on `haplotypes_` and `average_strands`. 555 """ 556 haplotypes = self.haplotypes_ 557 if haplotypes is None: 558 return None 559 if self.__average_strands: 560 return haplotypes 561 else: 562 return [x[:-2] for x in haplotypes] 563 564 @property 565 def n_haplotypes(self) -> Optional[int]: 566 """ 567 Retrieve `n_haplotypes`. 568 569 Returns: 570 **int:** 571 The total number of haplotypes, potentially reduced if filtering is applied 572 (`min_percent_snps > 0`). 573 """ 574 return len(self.haplotypes_) 575 576 @property 577 def n_samples(self) -> Optional[int]: 578 """ 579 Retrieve `n_samples`. 580 581 Returns: 582 **int:** 583 The total number of samples, potentially reduced if filtering is applied 584 (`min_percent_snps > 0`). 585 """ 586 return len(np.unique(self.samples_)) 587 588 def copy(self) -> 'mdPCA': 589 """ 590 Create and return a copy of `self`. 591 592 Returns: 593 **mdPCA:** 594 A new instance of the current object. 595 """ 596 return copy.copy(self) 597 598 def _process_masks(self, masks, rs_ID_list, ind_ID_list): 599 masked_matrix = masks[0][self.ancestry].T 600 rs_IDs = rs_ID_list[0] 601 ind_IDs = ind_ID_list[0] 602 return masked_matrix, rs_IDs, ind_IDs 603 604 def _load_mask_file(self): 605 mask_files = np.load(self.masks_file, allow_pickle=True) 606 masks = mask_files['masks'] 607 rs_ID_list = mask_files['rs_ID_list'] 608 ind_ID_list = mask_files['ind_ID_list'] 609 labels = mask_files['labels'] 610 weights = mask_files['weights'] 611 return masks, rs_ID_list, ind_ID_list, labels, weights 612 613 @staticmethod 614 def _compute_strength_vector(X): 615 strength_vector = np.sum(~np.isnan(X), axis=1) / X.shape[1] 616 return strength_vector 617 618 @staticmethod 619 def _compute_strength_matrix(X): 620 notmask = (~np.isnan(X)).astype(np.float32) 621 strength_matrix = np.dot(notmask, notmask.T) 622 strength_matrix /= X.shape[1] 623 return strength_matrix 624 625 def _cov(self, x): 626 ddof = 1 627 x = np.ma.array(x, ndmin=2, copy=True, dtype=np.float32) 628 xmask = np.ma.getmaskarray(x) 629 xnotmask = np.logical_not(xmask).astype(np.float32) 630 fact = np.dot(xnotmask, xnotmask.T) * 1. - ddof 631 del xnotmask 632 gc.collect() 633 result = (np.ma.dot(x, x.T, strict=False) / fact).squeeze() 634 x = x.data 635 strength_vec = self._compute_strength_vector(x) 636 strength_mat = self._compute_strength_matrix(x) 637 return result.data, strength_vec, strength_mat 638 639 @staticmethod 640 def _demean(S, w): 641 w_sum = np.matmul(w, S) 642 w_rowsum = w_sum.reshape((1, S.shape[0])) 643 w_colsum = w_sum.reshape((S.shape[0], 1)) 644 w_totalsum = np.dot(w_sum, w) 645 S -= (w_rowsum + w_colsum) - w_totalsum 646 return S 647 648 @staticmethod 649 def _iterative_svd_matrix_completion(cov): 650 num_masked = np.isnan(cov).sum() 651 if num_masked > 0: 652 start_time = time.time() 653 start_rank = 1 654 end_rank = 10 655 rank = 5 656 choose_best = True 657 num_cores = 5 658 cov_complete = IterativeSVD(start_rank=start_rank, end_rank=end_rank, rank=rank, choose_best=choose_best, num_cores=num_cores).fit_transform(cov) 659 logging.info("Iterative SVD --- %s seconds ---" % (time.time() - start_time)) 660 else: 661 cov_complete = cov 662 return cov_complete 663 664 def _weighted_cov_pca(self, X_incomplete, weights, covariance_matrix_file, n_components): 665 start_time = time.time() 666 X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete)) 667 S, _, _ = self._cov(X_incomplete) 668 logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time)) 669 weights_normalized = weights / weights.sum() 670 S = self._demean(S, weights_normalized) 671 W = np.diag(weights) 672 WSW = np.matmul(np.matmul(np.sqrt(W), S), np.sqrt(W)) 673 if covariance_matrix_file: 674 np.save(covariance_matrix_file, S.data) 675 svd = TruncatedSVD(n_components, algorithm="arpack") 676 svd.fit(WSW) 677 U = np.matmul(svd.components_, np.linalg.inv(np.sqrt(W))).T 678 D = np.diag(np.sqrt(svd.singular_values_)) 679 T = np.matmul(U, D) 680 total_var = np.trace(S) 681 for i in range(n_components): 682 pc_percentvar = 100 * svd.singular_values_[i] / total_var 683 logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar) 684 return T 685 686 @staticmethod 687 def _create_validation_mask(X_incomplete, percent_inds): 688 np.random.seed(1) 689 masked_rows = np.isnan(X_incomplete).any(axis=1) 690 masked_inds = np.flatnonzero(masked_rows) 691 X_masked = X_incomplete[masked_rows] 692 percent_masked = 100 * np.isnan(X_masked).sum() / (X_masked.shape[0] * X_masked.shape[1]) 693 unmasked_rows = ~masked_rows 694 X_unmasked = X_incomplete[unmasked_rows] 695 masked_rows = np.random.choice(range(X_unmasked.shape[0]), size=int(X_unmasked.shape[0] * percent_inds / 100), replace=False) 696 X_masked_rows = X_unmasked[masked_rows,:] 697 mask = np.zeros(X_masked_rows.shape[0] * X_masked_rows.shape[1], dtype=np.int8) 698 mask[:int(X_masked_rows.shape[0] * X_masked_rows.shape[1] * percent_masked / 100)] = 1 699 np.random.shuffle(mask) 700 mask = mask.astype(bool) 701 mask = mask.reshape(X_masked_rows.shape) 702 X_masked_rows[mask] = np.nan 703 X_unmasked[masked_rows] = X_masked_rows 704 X_incomplete[unmasked_rows] = X_unmasked 705 masked_rows_new = np.isnan(X_incomplete).any(axis=1) 706 masked_inds_new = np.flatnonzero(masked_rows_new) 707 masked_inds_val = sorted(list(set(masked_inds_new) - set(masked_inds))) 708 return X_incomplete, masked_inds_val 709 710 @staticmethod 711 def _create_cov_mask(cov, strength_mat, percent_vals_masked): 712 cov_flattened = cov.reshape(-1).copy() 713 num_vals_masked = int(percent_vals_masked * len(cov_flattened) / 100) 714 pos_masked = strength_mat.reshape(-1).argsort()[:num_vals_masked] 715 cov_flattened[pos_masked] = np.nan 716 cov_masked = cov_flattened.reshape(cov.shape) 717 return cov_masked 718 719 def _regularized_optimization_ils(self, X_incomplete, covariance_matrix_file, n_components): 720 def run_cov_matrix_regularized_optimization_ils(X_incomplete, covariance_matrix_file): 721 start_time = time.time() 722 X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete)) 723 has_missing = np.isnan(X_incomplete.data).sum() > 0 724 S, _, strength_mat = self._cov(X_incomplete) 725 logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time)) 726 S = self._demean(S, np.ones(S.shape[0]) / S.shape[0]) 727 if has_missing: 728 logging.info("Starting matrix completion. This will take a few minutes...") 729 start_time = time.time() 730 percent_inds_val = 20 # Percent of unmasked individuals to be masked for cross-validation 731 X_incomplete, masked_inds_val = self._create_validation_mask(X_incomplete.data, percent_inds_val) # masked_inds_val is the list of indices of the individuals masked for validation 732 X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete)) 733 S_prime, _, w_mat_prime = self._cov(X_incomplete) 734 del X_incomplete 735 gc.collect() 736 S_prime = self._demean(S_prime, np.ones(S_prime.shape[0]) / S_prime.shape[0]) 737 S_robust, lam = self.matrix_completion(S, strength_mat, S_prime, w_mat_prime, lams=None, method="NN", 738 cv_inds=masked_inds_val) 739 logging.info(f"Covariance Matrix --- %{time.time() - start_time:.2}s seconds ---") 740 else: 741 S_robust = S.copy() 742 if covariance_matrix_file: 743 np.save(covariance_matrix_file, S.data) 744 if has_missing: 745 base, ext = os.path.splitext(covariance_matrix_file) 746 np.save(f"{base}_completed_{lam}{ext}", S_robust.data) 747 return S, S_robust 748 749 def _compute_projection_regularized_optimization_ils(X_incomplete, S, S_robust, n_components): 750 _, _, strength_mat = self._cov(np.ma.array(X_incomplete, mask=np.isnan(X_incomplete))) 751 nonmissing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) == 0) 752 missing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) > 0) 753 missing_sorted = sorted(missing, key=lambda x: (~np.isnan(X_incomplete[x])).sum(), reverse=True) 754 svd = TruncatedSVD(n_components, algorithm="arpack") 755 svd.fit(S_robust) 756 U = svd.components_.T 757 U_nonmissing = U[nonmissing] 758 D_nonmissing = np.diag(np.sqrt(svd.singular_values_)) 759 T = np.zeros((X_incomplete.shape[0], n_components)) 760 T_filled = np.matmul(U_nonmissing, D_nonmissing) 761 i_filled = nonmissing 762 T[nonmissing] = T_filled 763 for i in missing_sorted: 764 S_i = S[i][i_filled] 765 W_i = np.diag(strength_mat[i][i_filled]) 766 A = np.matmul(W_i, T_filled) 767 b = np.matmul(W_i, S_i) 768 t = np.linalg.lstsq(A, b, rcond=None)[0] 769 T[i] = t 770 i_filled = np.append(i_filled, i) 771 T_filled = np.append(T_filled, [t], axis=0) 772 total_var = np.trace(S) 773 for i in range(n_components): 774 pc_percentvar = 100 * svd.singular_values_[i] / total_var 775 logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar) 776 return T 777 778 S, S_robust = run_cov_matrix_regularized_optimization_ils(X_incomplete, covariance_matrix_file) 779 T = _compute_projection_regularized_optimization_ils(X_incomplete, S, S_robust, n_components) 780 return T 781 782 def _cov_matrix_imputation(self, X_incomplete, percent_vals_masked, covariance_matrix_file, n_components): 783 start_time = time.time() 784 X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete)) 785 S, strength_vec, strength_mat = self._cov(X_incomplete) 786 logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time)) 787 S_masked = self._create_cov_mask(S, strength_mat, percent_vals_masked) 788 S = self._iterative_svd_matrix_completion(S_masked) 789 weights = np.ones(len(strength_vec)) 790 weights_normalized = weights / weights.sum() 791 S = self._demean(S, weights_normalized) 792 W = np.diag(weights) 793 WSW = np.matmul(np.matmul(np.sqrt(W), S), np.sqrt(W)) 794 if covariance_matrix_file: 795 np.save(covariance_matrix_file, S.data) 796 svd = TruncatedSVD(n_components, algorithm="arpack") 797 svd.fit(WSW) 798 U = np.matmul(svd.components_, np.linalg.inv(np.sqrt(W))).T 799 D = np.diag(np.sqrt(svd.singular_values_)) 800 T = np.matmul(U, D) 801 total_var = np.trace(S) 802 for i in range(n_components): 803 pc_percentvar = 100 * svd.singular_values_[i] / total_var 804 logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar) 805 return T 806 807 def _cov_matrix_imputation_ils(self, X_incomplete, percent_vals_masked, covariance_matrix_file, n_components): 808 809 def run_cov_matrix_cov_matrix_imputation_ils(X_incomplete, covariance_matrix_file): 810 start_time = time.time() 811 X_incomplete = np.ma.array(X_incomplete, mask=np.isnan(X_incomplete)) 812 S, strength_vec, strength_mat = self._cov(X_incomplete) 813 logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time)) 814 S_masked = self._create_cov_mask(S, strength_mat, percent_vals_masked) 815 S = self._iterative_svd_matrix_completion(S_masked) 816 weights = np.ones(len(strength_vec)) 817 weights_normalized = weights / weights.sum() 818 S = self._demean(S, weights_normalized) 819 W = np.diag(weights) 820 if covariance_matrix_file: 821 np.save(covariance_matrix_file, S.data) 822 return S, W 823 824 def compute_projection_cov_matrix_imputation_ils(X_incomplete, S, W, n_components): 825 S, _, strength_mat = self._cov(np.ma.array(X_incomplete, mask=np.isnan(X_incomplete))) 826 S = self._demean(S, np.ones(S.shape[0]) / S.shape[0]) 827 nonmissing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) == 0) 828 missing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) > 0) 829 missing_sorted = sorted(missing, key=lambda x: (~np.isnan(X_incomplete[x])).sum(), reverse=True) 830 svd = TruncatedSVD(n_components, algorithm="arpack") 831 WSW = np.matmul(np.matmul(np.sqrt(W), S), np.sqrt(W)) 832 svd.fit(WSW) 833 U = np.matmul(svd.components_, np.linalg.inv(np.sqrt(W))).T 834 U_nonmissing = U[nonmissing] 835 D_nonmissing = np.diag(np.sqrt(svd.singular_values_)) 836 T = np.zeros((X_incomplete.shape[0], n_components)) 837 T_filled = np.matmul(U_nonmissing, D_nonmissing) 838 i_filled = nonmissing 839 T[nonmissing] = T_filled 840 for i in missing_sorted: 841 S_i = S[i][i_filled] 842 W_i = np.diag(strength_mat[i][i_filled]) 843 A = np.matmul(W_i, T_filled) 844 b = np.matmul(W_i, S_i) 845 t = np.linalg.lstsq(A, b, rcond=None)[0] 846 T[i] = t 847 i_filled = np.append(i_filled, i) 848 T_filled = np.append(T_filled, [t], axis=0) 849 total_var = np.trace(S) 850 for i in range(n_components): 851 pc_percentvar = 100 * svd.singular_values_[i] / total_var 852 logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar) 853 return T 854 855 S, W = run_cov_matrix_cov_matrix_imputation_ils(X_incomplete, covariance_matrix_file) 856 T = compute_projection_cov_matrix_imputation_ils(X_incomplete, S, W, n_components) 857 return T 858 859 def _nonmissing_pca_ils(self, X_incomplete, covariance_matrix_file, n_components): 860 861 def run_cov_matrix_nonmissing_pca_ils(X_incomplete, covariance_matrix_file): 862 start_time = time.time() 863 nonmissing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) == 0) 864 X_incomplete_nonmissing = X_incomplete[nonmissing] 865 X_incomplete_nonmissing = np.ma.array(X_incomplete_nonmissing, mask=np.isnan(X_incomplete_nonmissing)) 866 S_nonmissing, _, _ = self._cov(X_incomplete_nonmissing) 867 logging.info("Covariance Matrix --- %s seconds ---" % (time.time() - start_time)) 868 S_nonmissing = self._demean(S_nonmissing, np.ones(S_nonmissing.shape[0]) / S_nonmissing.shape[0]) 869 if covariance_matrix_file: 870 np.save(covariance_matrix_file, S_nonmissing.data) 871 return S_nonmissing 872 873 def compute_projection_nonmissing_pca_ils(X_incomplete, S_nonmissing, n_components): 874 S, _, strength_mat = self._cov(np.ma.array(X_incomplete, mask=np.isnan(X_incomplete))) 875 S = self._demean(S, np.ones(S.shape[0]) / S.shape[0]) 876 nonmissing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) == 0) 877 missing = np.flatnonzero(np.isnan(X_incomplete).sum(axis=1) > 0) 878 missing_sorted = sorted(missing, key=lambda x: (~np.isnan(X_incomplete[x])).sum(), reverse=True) 879 svd = TruncatedSVD(n_components, algorithm="arpack") 880 svd.fit(S_nonmissing) 881 U_nonmissing = svd.components_.T 882 D_nonmissing = np.diag(np.sqrt(svd.singular_values_)) 883 T = np.zeros((X_incomplete.shape[0], n_components)) 884 T_filled = np.matmul(U_nonmissing, D_nonmissing) 885 i_filled = nonmissing 886 T[nonmissing] = T_filled 887 for i in missing_sorted: 888 S_i = S[i][i_filled] 889 W_i = np.diag(strength_mat[i][i_filled]) 890 A = np.matmul(W_i, T_filled) 891 b = np.matmul(W_i, S_i) 892 t = np.linalg.lstsq(A, b, rcond=None)[0] 893 T[i] = t 894 i_filled = np.append(i_filled, i) 895 T_filled = np.append(T_filled, [t], axis=0) 896 total_var = np.trace(S) 897 for i in range(n_components): 898 pc_percentvar = 100 * svd.singular_values_[i] / total_var 899 logging.info("Percent variance explained by the principal component %d: %s", i + 1, pc_percentvar) 900 return T 901 902 S_nonmissing = run_cov_matrix_nonmissing_pca_ils(X_incomplete, covariance_matrix_file) 903 T = compute_projection_nonmissing_pca_ils(X_incomplete, S_nonmissing, n_components) 904 return T 905 906 def _run_cov_matrix(self, X_incomplete, weights): 907 """ 908 Runs the specified PCA method on the incomplete data. 909 """ 910 if self.method == "weighted_cov_pca": 911 return self._weighted_cov_pca(X_incomplete, weights, self.covariance_matrix_file, self.n_components) 912 elif self.method == "regularized_optimization_ils": 913 return self._regularized_optimization_ils(X_incomplete, self.covariance_matrix_file, self.n_components) 914 elif self.method == "cov_matrix_imputation": 915 return self._cov_matrix_imputation(X_incomplete, self.percent_vals_masked, self.covariance_matrix_file, self.n_components) 916 elif self.method == "cov_matrix_imputation_ils": 917 return self._cov_matrix_imputation_ils(X_incomplete, self.percent_vals_masked, self.covariance_matrix_file, self.n_components) 918 elif self.method == "nonmissing_pca_ils": 919 return self._nonmissing_pca_ils(X_incomplete, self.covariance_matrix_file, self.n_components) 920 else: 921 raise ValueError(f"Unsupported method: {self.method}") 922 923 def fit_transform( 924 self, 925 snpobj: Optional['SNPObject'] = None, 926 laiobj: Optional['LocalAncestryObject'] = None, 927 labels_file: Optional[str] = None, 928 ancestry: Optional[str] = None, 929 average_strands: Optional[bool] = None 930 ) -> np.ndarray: 931 """ 932 Fit the model to the SNP data stored in the provided `snpobj` and apply the dimensionality reduction on the same SNP data. 933 934 This method starts by loading or updating SNP and ancestry data. Then, it manages missing values by applying 935 masks based on ancestry, either by loading a pre-existing mask or generating new ones. After processing these 936 masks to produce an incomplete SNP data matrix, it applies the chosen mdPCA method to reduce dimensionality 937 while handling missing data as specified. 938 939 Args: 940 snpobj (SNPObject, optional): 941 A SNPObject instance. 942 laiobj (LAIObject, optional): 943 A LAIObject instance. 944 labels_file (str, optional): 945 Path to the labels file in .tsv format. The first column, `indID`, contains the individual identifiers, and the second 946 column, `label`, specifies the groups for all individuals. If `is_weighted=True`, a `weight` column with individual 947 weights is required. Optionally, `combination` and `combination_weight` columns can specify sets of individuals to be 948 combined into groups, with respective weights. 949 ancestry (str, optional): 950 Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0. 951 average_strands (bool, optional): 952 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 953 If None, defaults to `self.average_strands`. 954 955 Returns: 956 **array of shape (n_samples, n_components):** 957 The transformed SNP data projected onto the `n_components` principal components, stored in `self.X_new_`. 958 """ 959 if snpobj is None: 960 snpobj = self.snpobj 961 if laiobj is None: 962 laiobj = self.laiobj 963 if labels_file is None: 964 labels_file = self.labels_file 965 if ancestry is None: 966 ancestry = self.ancestry 967 if average_strands is None: 968 average_strands = self.average_strands 969 970 if self.load_masks: 971 masks, rs_id_list, ind_id_list, _, weights = self._load_mask_file() 972 else: 973 masks, rs_id_list, ind_id_list = array_process( 974 self.snpobj, 975 self.laiobj, 976 self.average_strands, 977 self.prob_thresh, 978 self.is_masked, 979 self.rsid_or_chrompos 980 ) 981 982 masks, ind_id_list, _, weights = process_labels_weights( 983 self.labels_file, 984 masks, 985 rs_id_list, 986 ind_id_list, 987 self.average_strands, 988 self.ancestry, 989 self.min_percent_snps, 990 self.groups_to_remove, 991 self.is_weighted, 992 self.save_masks, 993 self.masks_file 994 ) 995 996 X_incomplete, _, _ = self._process_masks(masks, rs_id_list, ind_id_list) 997 998 # Call run_cov_matrix with the specified method 999 self.X_new_ = self._run_cov_matrix( 1000 X_incomplete, 1001 weights 1002 ) 1003 1004 self.haplotypes_ = ind_id_list 1005 1006 return self.X_new_
A class for missing data principal component analysis (mdPCA).
This class supports both separate and averaged strand processing for SNP data. If the snpobj
,
laiobj
, labels_file
, and ancestry
parameters are all provided during instantiation,
the fit_transform
method will be automatically called, applying the specified mdPCA method to transform
the data upon instantiation.
28 def __init__( 29 self, 30 method: str = 'weighted_cov_pca', 31 snpobj: Optional['SNPObject'] = None, 32 laiobj: Optional['LocalAncestryObject'] = None, 33 labels_file: Optional[str] = None, 34 ancestry: Optional[str] = None, 35 is_masked: bool = True, 36 prob_thresh: float = 0, 37 average_strands: bool = False, 38 is_weighted: bool = False, 39 groups_to_remove: Dict[int, List[str]] = {}, 40 min_percent_snps: float = 4, 41 save_masks: bool = False, 42 load_masks: bool = False, 43 masks_file: Union[str, pathlib.Path] = 'masks.npz', 44 output_file: Union[str, pathlib.Path] = 'output.tsv', 45 covariance_matrix_file: Optional[str] = None, 46 n_components: int = 2, 47 rsid_or_chrompos: int = 2, 48 percent_vals_masked: float = 0 49 ): 50 """ 51 Args: 52 method (str, default='weighted_cov_pca'): 53 The PCA method to use for dimensionality reduction. Options include: 54 - `'weighted_cov_pca'`: 55 Simple covariance-based PCA, weighted by sample strengths. 56 - `'regularized_optimization_ils'`: 57 Regularized optimization followed by iterative, weighted (via the strengths) least squares projection of 58 missing samples using the original covariance matrix (considering only relevant elements not missing in 59 the original covariance matrix for those samples). 60 - `'cov_matrix_imputation'`: 61 Eigen-decomposition of the covariance matrix after first imputing the covariance matrix missing values 62 using the Iterative SVD imputation method. 63 - `'cov_matrix_imputation_ils'`: 64 The method of 'cov_matrix_imputation', but where afterwards missing samples are re-projected onto the space 65 given by 'cov_matrix_imputation' using the same iterative method on the original covariance matrix just 66 as done in 'regularized_optimization_ils'. 67 - `'nonmissing_pca_ils'`: 68 The method of 'weighted_cov_pca' on the non-missing samples, followed by the projection of missing samples onto 69 the space given by 'weighted_cov_pca' using the same iterative method on the original covariance matrix just as 70 done in 'regularized_optimization_ils'. 71 snpobj (SNPObject, optional): 72 A SNPObject instance. 73 laiobj (LAIObject, optional): 74 A LAIObject instance. 75 labels_file (str, optional): 76 Path to the labels file in .tsv format. The first column, `indID`, contains the individual identifiers, and the second 77 column, `label`, specifies the groups for all individuals. If `is_weighted=True`, a `weight` column with individual 78 weights is required. Optionally, `combination` and `combination_weight` columns can specify sets of individuals to be 79 combined into groups, with respective weights. 80 ancestry (str, optional): 81 Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at `0`. 82 is_masked (bool, default=True): 83 True if an ancestry file is passed for ancestry-specific masking, or False otherwise. 84 prob_thresh (float, default=0): 85 Minimum probability threshold for a SNP to belong to an ancestry. 86 average_strands (bool, default=False): 87 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 88 is_weighted (bool, default=False): 89 True if weights are provided in the labels file, or False otherwise. 90 groups_to_remove (dict of int to list of str, default={}): 91 Dictionary specifying groups to exclude from analysis. Keys are array numbers, and values are 92 lists of groups to remove for each array. 93 Example: `{1: ['group1', 'group2'], 2: [], 3: ['group3']}`. 94 min_percent_snps (float, default=4): 95 Minimum percentage of SNPs that must be known for an individual to be included in the analysis. 96 All individuals with fewer percent of unmasked SNPs than this threshold will be excluded. 97 save_masks (bool, default=False): 98 True if the masked matrices are to be saved in a `.npz` file, or False otherwise. 99 load_masks (bool, default=False): 100 True if the masked matrices are to be loaded from a pre-existing `.npz` file specified by `masks_file`, or False otherwise. 101 masks_file (str or pathlib.Path, default='masks.npz'): 102 Path to the `.npz` file used for saving/loading masked matrices. 103 output_file (str or pathlib.Path, default='output.tsv'): 104 Path to the output `.tsv` file where mdPCA results are saved. 105 covariance_matrix_file (str, optional): 106 Path to save the covariance matrix file in `.npy` format. If None, the covariance matrix is not saved. Default is None. 107 n_components (int, default=2): 108 The number of principal components. 109 rsid_or_chrompos (int, default=2): 110 Format indicator for SNP IDs in the SNP data. Use 1 for `rsID` format or 2 for `chromosome_position`. 111 percent_vals_masked (float, default=0): 112 Percentage of values in the covariance matrix to be masked and then imputed. Only applicable if `method` is 113 `'cov_matrix_imputation'` or `'cov_matrix_imputation_ils'`. 114 """ 115 self.__snpobj = snpobj 116 self.__laiobj = laiobj 117 self.__labels_file = labels_file 118 self.__ancestry = ancestry 119 self.__method = method 120 self.__is_masked = is_masked 121 self.__prob_thresh = prob_thresh 122 self.__average_strands = average_strands 123 self.__is_weighted = is_weighted 124 self.__groups_to_remove = groups_to_remove 125 self.__min_percent_snps = min_percent_snps 126 self.__save_masks = save_masks 127 self.__load_masks = load_masks 128 self.__masks_file = masks_file 129 self.__output_file = output_file 130 self.__covariance_matrix_file = covariance_matrix_file 131 self.__n_components = n_components 132 self.__rsid_or_chrompos = rsid_or_chrompos 133 self.__percent_vals_masked = percent_vals_masked 134 self.__X_new_ = None # Store transformed SNP data 135 self.__haplotypes_ = None # Store haplotypes after filtering if min_percent_snps > 0 136 self.__samples_ = None # Store samples after filtering if min_percent_snps > 0 137 138 # Fit and transform if a `snpobj`, `laiobj`, `labels_file`, and `ancestry` are provided 139 if self.snpobj is not None and self.laiobj is not None and self.labels_file is not None and self.ancestry is not None: 140 self.fit_transform(snpobj, laiobj, labels_file, ancestry)
Arguments:
- method (str, default='weighted_cov_pca'): The PCA method to use for dimensionality reduction. Options include:
'weighted_cov_pca'
: Simple covariance-based PCA, weighted by sample strengths.'regularized_optimization_ils'
: Regularized optimization followed by iterative, weighted (via the strengths) least squares projection of missing samples using the original covariance matrix (considering only relevant elements not missing in the original covariance matrix for those samples).'cov_matrix_imputation'
: Eigen-decomposition of the covariance matrix after first imputing the covariance matrix missing values using the Iterative SVD imputation method.'cov_matrix_imputation_ils'
: The method of 'cov_matrix_imputation', but where afterwards missing samples are re-projected onto the space given by 'cov_matrix_imputation' using the same iterative method on the original covariance matrix just as done in 'regularized_optimization_ils'.'nonmissing_pca_ils'
: The method of 'weighted_cov_pca' on the non-missing samples, followed by the projection of missing samples onto the space given by 'weighted_cov_pca' using the same iterative method on the original covariance matrix just as done in 'regularized_optimization_ils'.
- snpobj (SNPObject, optional): A SNPObject instance.
- laiobj (LAIObject, optional): A LAIObject instance.
- labels_file (str, optional): Path to the labels file in .tsv format. The first column,
indID
, contains the individual identifiers, and the second column,label
, specifies the groups for all individuals. Ifis_weighted=True
, aweight
column with individual weights is required. Optionally,combination
andcombination_weight
columns can specify sets of individuals to be combined into groups, with respective weights. - ancestry (str, optional): Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at
0
. - is_masked (bool, default=True): True if an ancestry file is passed for ancestry-specific masking, or False otherwise.
- prob_thresh (float, default=0): Minimum probability threshold for a SNP to belong to an ancestry.
- average_strands (bool, default=False): True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
- is_weighted (bool, default=False): True if weights are provided in the labels file, or False otherwise.
- groups_to_remove (dict of int to list of str, default={}): Dictionary specifying groups to exclude from analysis. Keys are array numbers, and values are
lists of groups to remove for each array.
Example:
{1: ['group1', 'group2'], 2: [], 3: ['group3']}
. - min_percent_snps (float, default=4): Minimum percentage of SNPs that must be known for an individual to be included in the analysis. All individuals with fewer percent of unmasked SNPs than this threshold will be excluded.
- save_masks (bool, default=False): True if the masked matrices are to be saved in a
.npz
file, or False otherwise. - load_masks (bool, default=False): True if the masked matrices are to be loaded from a pre-existing
.npz
file specified bymasks_file
, or False otherwise. - masks_file (str or pathlib.Path, default='masks.npz'): Path to the
.npz
file used for saving/loading masked matrices. - output_file (str or pathlib.Path, default='output.tsv'): Path to the output
.tsv
file where mdPCA results are saved. - covariance_matrix_file (str, optional): Path to save the covariance matrix file in
.npy
format. If None, the covariance matrix is not saved. Default is None. - n_components (int, default=2): The number of principal components.
- rsid_or_chrompos (int, default=2): Format indicator for SNP IDs in the SNP data. Use 1 for
rsID
format or 2 forchromosome_position
. - percent_vals_masked (float, default=0): Percentage of values in the covariance matrix to be masked and then imputed. Only applicable if
method
is'cov_matrix_imputation'
or'cov_matrix_imputation_ils'
.
230 @property 231 def ancestry(self) -> Optional[str]: 232 """ 233 Retrieve `ancestry`. 234 235 Returns: 236 **str:** Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0. 237 """ 238 return self.__ancestry
Retrieve ancestry
.
Returns:
str: Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0.
247 @property 248 def is_masked(self) -> bool: 249 """ 250 Retrieve `is_masked`. 251 252 Returns: 253 **bool:** True if an ancestry file is passed for ancestry-specific masking, or False otherwise. 254 """ 255 return self.__is_masked
Retrieve is_masked
.
Returns:
bool: True if an ancestry file is passed for ancestry-specific masking, or False otherwise.
264 @property 265 def prob_thresh(self) -> float: 266 """ 267 Retrieve `prob_thresh`. 268 269 Returns: 270 **float:** Minimum probability threshold for a SNP to belong to an ancestry. 271 """ 272 return self.__prob_thresh
Retrieve prob_thresh
.
Returns:
float: Minimum probability threshold for a SNP to belong to an ancestry.
280 @property 281 def average_strands(self) -> bool: 282 """ 283 Retrieve `average_strands`. 284 285 Returns: 286 **bool:** True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 287 """ 288 return self.__average_strands
Retrieve average_strands
.
Returns:
bool: True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
297 @property 298 def is_weighted(self) -> bool: 299 """ 300 Retrieve `is_weighted`. 301 302 Returns: 303 **bool:** True if weights are provided in the labels file, or False otherwise. 304 """ 305 return self.__is_weighted
Retrieve is_weighted
.
Returns:
bool: True if weights are provided in the labels file, or False otherwise.
314 @property 315 def groups_to_remove(self) -> Dict[int, List[str]]: 316 """ 317 Retrieve `groups_to_remove`. 318 319 Returns: 320 **dict of int to list of str:** Dictionary specifying groups to exclude from analysis. Keys are array numbers, and values are 321 lists of groups to remove for each array. Example: `{1: ['group1', 'group2'], 2: [], 3: ['group3']}`. 322 """ 323 return self.__groups_to_remove
Retrieve groups_to_remove
.
Returns:
dict of int to list of str: Dictionary specifying groups to exclude from analysis. Keys are array numbers, and values are lists of groups to remove for each array. Example:
{1: ['group1', 'group2'], 2: [], 3: ['group3']}
.
332 @property 333 def min_percent_snps(self) -> float: 334 """ 335 Retrieve `min_percent_snps`. 336 337 Returns: 338 **float:** 339 Minimum percentage of SNPs that must be known for an individual to be included in the analysis. 340 All individuals with fewer percent of unmasked SNPs than this threshold will be excluded. 341 """ 342 return self.__min_percent_snps
Retrieve min_percent_snps
.
Returns:
float: Minimum percentage of SNPs that must be known for an individual to be included in the analysis. All individuals with fewer percent of unmasked SNPs than this threshold will be excluded.
351 @property 352 def save_masks(self) -> bool: 353 """ 354 Retrieve `save_masks`. 355 356 Returns: 357 **bool:** True if the masked matrices are to be saved in a `.npz` file, or False otherwise. 358 """ 359 return self.__save_masks
Retrieve save_masks
.
Returns:
bool: True if the masked matrices are to be saved in a
.npz
file, or False otherwise.
368 @property 369 def load_masks(self) -> bool: 370 """ 371 Retrieve `load_masks`. 372 373 Returns: 374 **bool:** 375 True if the masked matrices are to be loaded from a pre-existing `.npz` file specified 376 by `masks_file`, or False otherwise. 377 """ 378 return self.__load_masks
Retrieve load_masks
.
Returns:
bool: True if the masked matrices are to be loaded from a pre-existing
.npz
file specified bymasks_file
, or False otherwise.
387 @property 388 def masks_file(self) -> Union[str, pathlib.Path]: 389 """ 390 Retrieve `masks_file`. 391 392 Returns: 393 **str or pathlib.Path:** Path to the `.npz` file used for saving/loading masked matrices. 394 """ 395 return self.__masks_file
Retrieve masks_file
.
Returns:
str or pathlib.Path: Path to the
.npz
file used for saving/loading masked matrices.
404 @property 405 def output_file(self) -> Union[str, pathlib.Path]: 406 """ 407 Retrieve `output_file`. 408 409 Returns: 410 **str or pathlib.Path:** Path to the output `.tsv` file where mdPCA results are saved. 411 """ 412 return self.__output_file
Retrieve output_file
.
Returns:
str or pathlib.Path: Path to the output
.tsv
file where mdPCA results are saved.
421 @property 422 def covariance_matrix_file(self) -> Optional[str]: 423 """ 424 Retrieve `covariance_matrix_file`. 425 426 Returns: 427 **str:** Path to save the covariance matrix file in `.npy` format. 428 """ 429 return self.__covariance_matrix_file
Retrieve covariance_matrix_file
.
Returns:
str: Path to save the covariance matrix file in
.npy
format.
455 @property 456 def rsid_or_chrompos(self) -> int: 457 """ 458 Retrieve `rsid_or_chrompos`. 459 460 Returns: 461 **int:** Format indicator for SNP IDs in the SNP data. Use 1 for `rsID` format or 2 for `chromosome_position`. 462 """ 463 return self.__rsid_or_chrompos
Retrieve rsid_or_chrompos
.
Returns:
int: Format indicator for SNP IDs in the SNP data. Use 1 for
rsID
format or 2 forchromosome_position
.
472 @property 473 def percent_vals_masked(self) -> float: 474 """ 475 Retrieve `percent_vals_masked`. 476 477 Returns: 478 **float:** 479 Percentage of values in the covariance matrix to be masked and then imputed. Only applicable if `method` is 480 `'cov_matrix_imputation'` or `'cov_matrix_imputation_ils'`. 481 """ 482 return self.__percent_vals_masked
Retrieve percent_vals_masked
.
Returns:
float: Percentage of values in the covariance matrix to be masked and then imputed. Only applicable if
method
is'cov_matrix_imputation'
or'cov_matrix_imputation_ils'
.
491 @property 492 def X_new_(self) -> Optional[np.ndarray]: 493 """ 494 Retrieve `X_new_`. 495 496 Returns: 497 **array of shape (n_samples, n_components):** 498 The transformed SNP data projected onto the `n_components` principal components. 499 n_haplotypes_ is the number of haplotypes, potentially reduced if filtering is applied 500 (`min_percent_snps > 0`). For diploid individuals without filtering, the shape is 501 `(n_samples * 2, n_components)`. 502 """ 503 return self.__X_new_
Retrieve X_new_
.
Returns:
array of shape (n_samples, n_components): The transformed SNP data projected onto the
n_components
principal components. n_haplotypes_ is the number of haplotypes, potentially reduced if filtering is applied (min_percent_snps > 0
). For diploid individuals without filtering, the shape is(n_samples * 2, n_components)
.
512 @property 513 def haplotypes_(self) -> Optional[List[str]]: 514 """ 515 Retrieve `haplotypes_`. 516 517 Returns: 518 list of str: 519 A list of unique haplotype identifiers. 520 """ 521 if isinstance(self.__haplotypes_, np.ndarray): 522 return self.__haplotypes_.ravel().tolist() # Flatten and convert NumPy array to a list 523 elif isinstance(self.__haplotypes_, list): 524 if len(self.__haplotypes_) == 1 and isinstance(self.__haplotypes_[0], np.ndarray): 525 return self.__haplotypes_[0].ravel().tolist() # Handle list containing a single array 526 return self.__haplotypes_ # Already a flat list 527 elif self.__haplotypes_ is None: 528 return None # If no haplotypes are set 529 else: 530 raise TypeError("`haplotypes_` must be a list or a NumPy array.")
547 @property 548 def samples_(self) -> Optional[List[str]]: 549 """ 550 Retrieve `samples_`. 551 552 Returns: 553 list of str: 554 A list of sample identifiers based on `haplotypes_` and `average_strands`. 555 """ 556 haplotypes = self.haplotypes_ 557 if haplotypes is None: 558 return None 559 if self.__average_strands: 560 return haplotypes 561 else: 562 return [x[:-2] for x in haplotypes]
Retrieve samples_
.
Returns:
list of str: A list of sample identifiers based on
haplotypes_
andaverage_strands
.
564 @property 565 def n_haplotypes(self) -> Optional[int]: 566 """ 567 Retrieve `n_haplotypes`. 568 569 Returns: 570 **int:** 571 The total number of haplotypes, potentially reduced if filtering is applied 572 (`min_percent_snps > 0`). 573 """ 574 return len(self.haplotypes_)
Retrieve n_haplotypes
.
Returns:
int: The total number of haplotypes, potentially reduced if filtering is applied (
min_percent_snps > 0
).
576 @property 577 def n_samples(self) -> Optional[int]: 578 """ 579 Retrieve `n_samples`. 580 581 Returns: 582 **int:** 583 The total number of samples, potentially reduced if filtering is applied 584 (`min_percent_snps > 0`). 585 """ 586 return len(np.unique(self.samples_))
Retrieve n_samples
.
Returns:
int: The total number of samples, potentially reduced if filtering is applied (
min_percent_snps > 0
).
588 def copy(self) -> 'mdPCA': 589 """ 590 Create and return a copy of `self`. 591 592 Returns: 593 **mdPCA:** 594 A new instance of the current object. 595 """ 596 return copy.copy(self)
Create and return a copy of self
.
Returns:
mdPCA: A new instance of the current object.
923 def fit_transform( 924 self, 925 snpobj: Optional['SNPObject'] = None, 926 laiobj: Optional['LocalAncestryObject'] = None, 927 labels_file: Optional[str] = None, 928 ancestry: Optional[str] = None, 929 average_strands: Optional[bool] = None 930 ) -> np.ndarray: 931 """ 932 Fit the model to the SNP data stored in the provided `snpobj` and apply the dimensionality reduction on the same SNP data. 933 934 This method starts by loading or updating SNP and ancestry data. Then, it manages missing values by applying 935 masks based on ancestry, either by loading a pre-existing mask or generating new ones. After processing these 936 masks to produce an incomplete SNP data matrix, it applies the chosen mdPCA method to reduce dimensionality 937 while handling missing data as specified. 938 939 Args: 940 snpobj (SNPObject, optional): 941 A SNPObject instance. 942 laiobj (LAIObject, optional): 943 A LAIObject instance. 944 labels_file (str, optional): 945 Path to the labels file in .tsv format. The first column, `indID`, contains the individual identifiers, and the second 946 column, `label`, specifies the groups for all individuals. If `is_weighted=True`, a `weight` column with individual 947 weights is required. Optionally, `combination` and `combination_weight` columns can specify sets of individuals to be 948 combined into groups, with respective weights. 949 ancestry (str, optional): 950 Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0. 951 average_strands (bool, optional): 952 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 953 If None, defaults to `self.average_strands`. 954 955 Returns: 956 **array of shape (n_samples, n_components):** 957 The transformed SNP data projected onto the `n_components` principal components, stored in `self.X_new_`. 958 """ 959 if snpobj is None: 960 snpobj = self.snpobj 961 if laiobj is None: 962 laiobj = self.laiobj 963 if labels_file is None: 964 labels_file = self.labels_file 965 if ancestry is None: 966 ancestry = self.ancestry 967 if average_strands is None: 968 average_strands = self.average_strands 969 970 if self.load_masks: 971 masks, rs_id_list, ind_id_list, _, weights = self._load_mask_file() 972 else: 973 masks, rs_id_list, ind_id_list = array_process( 974 self.snpobj, 975 self.laiobj, 976 self.average_strands, 977 self.prob_thresh, 978 self.is_masked, 979 self.rsid_or_chrompos 980 ) 981 982 masks, ind_id_list, _, weights = process_labels_weights( 983 self.labels_file, 984 masks, 985 rs_id_list, 986 ind_id_list, 987 self.average_strands, 988 self.ancestry, 989 self.min_percent_snps, 990 self.groups_to_remove, 991 self.is_weighted, 992 self.save_masks, 993 self.masks_file 994 ) 995 996 X_incomplete, _, _ = self._process_masks(masks, rs_id_list, ind_id_list) 997 998 # Call run_cov_matrix with the specified method 999 self.X_new_ = self._run_cov_matrix( 1000 X_incomplete, 1001 weights 1002 ) 1003 1004 self.haplotypes_ = ind_id_list 1005 1006 return self.X_new_
Fit the model to the SNP data stored in the provided snpobj
and apply the dimensionality reduction on the same SNP data.
This method starts by loading or updating SNP and ancestry data. Then, it manages missing values by applying masks based on ancestry, either by loading a pre-existing mask or generating new ones. After processing these masks to produce an incomplete SNP data matrix, it applies the chosen mdPCA method to reduce dimensionality while handling missing data as specified.
Arguments:
- snpobj (SNPObject, optional): A SNPObject instance.
- laiobj (LAIObject, optional): A LAIObject instance.
- labels_file (str, optional): Path to the labels file in .tsv format. The first column,
indID
, contains the individual identifiers, and the second column,label
, specifies the groups for all individuals. Ifis_weighted=True
, aweight
column with individual weights is required. Optionally,combination
andcombination_weight
columns can specify sets of individuals to be combined into groups, with respective weights. - ancestry (str, optional): Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0.
- average_strands (bool, optional): True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
If None, defaults to
self.average_strands
.
Returns:
array of shape (n_samples, n_components): The transformed SNP data projected onto the
n_components
principal components, stored inself.X_new_
.