snputils.processing.pca
1import warnings 2import torch 3import numpy as np 4import copy 5from typing import Tuple, Optional, Union, List 6from sklearn.decomposition import PCA as skPCA 7 8from snputils.snp.genobj.snpobj import SNPObject 9 10 11def _svd_flip(u, v, u_based_decision=True): 12 """ 13 Sign correction to ensure deterministic output from SVD. 14 15 Adjusts the columns of u and the rows of v such that the loadings in the 16 columns in u that are largest in absolute value are always positive. 17 """ 18 if u_based_decision: 19 # columns of u, rows of v 20 max_abs_cols = torch.argmax(torch.abs(u), axis=0) 21 signs = torch.sign(u[max_abs_cols, range(u.shape[1])]) 22 u *= signs 23 v *= signs[:, None] 24 else: 25 # rows of v, columns of u 26 max_abs_rows = torch.argmax(torch.abs(v), axis=1) 27 signs = torch.sign(v[range(v.shape[0]), max_abs_rows]) 28 u *= signs 29 v *= signs[:, None] 30 31 return u, v 32 33 34class TorchPCA: 35 """ 36 A class for GPU-based Principal Component Analysis (PCA) with PyTorch tensors. 37 38 This implementation leverages GPU acceleration to achieve significant performance improvements, 39 being up to 25 times faster than `sklearn.decomposition.PCA` when running on a compatible GPU. 40 """ 41 def __init__(self, n_components: int = 2, fitting: str = 'reduced'): 42 """ 43 Args: 44 n_components (int, default=2): 45 The number of principal components. If None, defaults to the minimum of `n_samples` and `n_snps`. 46 fitting (str, default='reduced'): 47 The fitting approach for the SVD computation. Options: 48 - `'full'`: Full Singular Value Decomposition (SVD). 49 - `'reduced'`: Economy SVD. 50 - `'lowrank'`: Low-rank approximation, which provides a faster but approximate solution. 51 Default is 'reduced'. 52 """ 53 self.__n_components = n_components 54 self.__fitting = fitting 55 self.__n_components_ = None 56 self.__components_ = None 57 self.__mean_ = None 58 self.__X_new_ = None # Store transformed SNP data 59 60 def __getitem__(self, key): 61 """ 62 To access an attribute of the class using the square bracket notation, 63 similar to a dictionary. 64 """ 65 try: 66 return getattr(self, key) 67 except AttributeError: 68 raise KeyError(f'Invalid key: {key}') 69 70 def __setitem__(self, key, value): 71 """ 72 To set an attribute of the class using the square bracket notation, 73 similar to a dictionary. 74 """ 75 try: 76 setattr(self, key, value) 77 except AttributeError: 78 raise KeyError(f'Invalid key: {key}') 79 80 @property 81 def n_components(self) -> Optional[int]: 82 """ 83 Retrieve `n_components`. 84 85 Returns: 86 **int:** The number of principal components. 87 """ 88 return self.__n_components 89 90 @n_components.setter 91 def n_components(self, x: torch.Tensor) -> None: 92 """ 93 Update `n_components`. 94 """ 95 self.__n_components = x 96 97 @property 98 def fitting(self) -> str: 99 """ 100 Retrieve `fitting`. 101 102 Returns: 103 **str:** 104 The fitting approach for the SVD computation. 105 """ 106 return self.__fitting 107 108 @fitting.setter 109 def fitting(self, x: torch.Tensor) -> None: 110 """ 111 Update `fitting`. 112 """ 113 self.__fitting = x 114 115 @property 116 def n_components_(self) -> Optional[int]: 117 """ 118 Retrieve `n_components_`. 119 120 Returns: 121 **int:** 122 The effective number of components retained after fitting, 123 calculated as `min(self.n_components, min(n_samples, n_snps))`. 124 """ 125 return self.__n_components_ 126 127 @n_components_.setter 128 def n_components_(self, x: int) -> None: 129 """ 130 Update `n_components_`. 131 """ 132 self.__n_components_ = x 133 134 @property 135 def components_(self) -> Optional[torch.Tensor]: 136 """ 137 Retrieve `components_`. 138 139 Returns: 140 **tensor of shape (n_components_, n_snps):** 141 Matrix of principal components, where each row is a principal component vector. 142 """ 143 return self.__components_ 144 145 @components_.setter 146 def components_(self, x: torch.Tensor) -> None: 147 """ 148 Update `components_`. 149 """ 150 self.__components_ = x 151 152 @property 153 def mean_(self) -> Optional[torch.Tensor]: 154 """ 155 Retrieve `mean_`. 156 157 Returns: 158 **tensor of shape (n_snps,):** 159 Per-feature mean vector of the input data used for centering. 160 """ 161 return self.__mean_ 162 163 @mean_.setter 164 def mean_(self, x: torch.Tensor) -> None: 165 """ 166 Update `mean_`. 167 """ 168 self.__mean_ = x 169 170 @property 171 def X_new_(self) -> Optional[Union[torch.Tensor, np.ndarray]]: 172 """ 173 Retrieve `X_new_`. 174 175 Returns: 176 **tensor of shape (n_samples, n_components_):** 177 The transformed SNP data projected onto the `n_components_` principal components. 178 """ 179 return self.__X_new_ 180 181 @X_new_.setter 182 def X_new_(self, x: torch.Tensor) -> None: 183 """ 184 Update `X_new_`. 185 """ 186 self.__X_new_ = x 187 188 def copy(self) -> 'TorchPCA': 189 """ 190 Create and return a copy of `self`. 191 192 Returns: 193 **TorchPCA:** 194 A new instance of the current object. 195 """ 196 return copy.copy(self) 197 198 def _fit(self, X: torch.Tensor) -> Tuple: 199 """ 200 Internal method to fit the PCA model to the data `X`. 201 202 Args: 203 X (tensor of shape (n_samples, n_snps)): 204 Input SNP data used for fitting the model. 205 206 Returns: 207 Tuple: U, S, and Vt matrices from the SVD, where: 208 - `U` has shape (n_samples, n_components). 209 - `S` contains singular values (n_components). 210 - `Vt` has shape (n_components, n_snps) and represents the principal components. 211 """ 212 n_samples, n_snps = X.shape 213 self.n_components_ = min(self.n_components or min(X.shape), min(X.shape)) 214 215 if self.n_components_ > min(X.shape): 216 raise ValueError(f"n_components should be <= min(n_samples: {n_samples}, n_snps: {n_snps})") 217 218 # Compute the mean to center the data 219 self.mean_ = torch.mean(X, dim=0) 220 221 # Choose SVD method based on the fitting type 222 if self.fitting == "full": 223 U, S, Vt = torch.linalg.svd(X - self.mean_, full_matrices=True) 224 elif self.fitting == "reduced": 225 U, S, Vt = torch.linalg.svd(X - self.mean_, full_matrices=False) 226 elif self.fitting == "lowrank": 227 U, S, V = torch.svd_lowrank(X, q=self.n_components_, M=self.mean_) 228 Vt = V.mT # Transpose V to get Vt 229 else: 230 raise ValueError(f"Unrecognized fitting method: {self.fitting}") 231 232 # Select the first `n_components` columns and singular values 233 U = U[:, :self.n_components_] 234 S = S[:self.n_components_] 235 Vt = Vt[:self.n_components_] 236 237 # Ensure deterministic output for U and Vt signs 238 U, Vt = _svd_flip(U, Vt) 239 self.components_ = Vt 240 241 return U, S, Vt 242 243 def fit(self, X: torch.Tensor) -> 'TorchPCA': 244 """ 245 Fit the model to the input SNP data. 246 247 Args: 248 X (tensor of shape (n_samples, n_snps)): 249 The SNP data matrix to fit the model. 250 251 Returns: 252 **TorchPCA:** 253 The fitted instance of `self`. 254 """ 255 self._fit(X) 256 return self 257 258 def transform(self, X: torch.Tensor) -> torch.Tensor: 259 """ 260 Apply dimensionality reduction to the input SNP data using the fitted model. 261 262 Args: 263 X (tensor of shape (n_samples, n_snps)): 264 The SNP data matrix to be transformed. 265 266 Returns: 267 **tensor of shape (n_samples, n_components_):** 268 The transformed SNP data projected onto the `n_components_` principal components, 269 stored in `self.X_new_`. 270 """ 271 if self.components_ is None or self.mean_ is None: 272 raise ValueError("The PCA model must be fitted before calling `transform`.") 273 274 self.X_new_ = torch.matmul(X - self.mean_, self.components_.T) 275 return self.X_new_ 276 277 def fit_transform(self, X): 278 """ 279 Fit the model to the SNP data and apply dimensionality reduction on the same SNP data. 280 281 Args: 282 X (tensor of shape n_samples, n_snps): 283 The SNP data matrix used for both fitting and transformation. 284 285 Returns: 286 **tensor of shape (n_samples, n_components_):** 287 The transformed SNP data projected onto the `n_components_` principal components, 288 stored in `self.X_new_`. 289 """ 290 U, S, _ = self._fit(X) 291 self.X_new_ = U * S.unsqueeze(0) 292 return self.X_new_ 293 294 295class PCA: 296 """ 297 A class for Principal Component Analysis (PCA) on SNP data stored in a `snputils.snp.genobj.SNPObject`. 298 This class employs either 299 [sklearn.decomposition.PCA](https://scikit-learn.org/stable/modules/generated/sklearn.decomposition.PCA.html) 300 or the custom `TorchPCA` implementation for efficient GPU-accelerated analysis. 301 302 The PCA class supports both separate and averaged strand processing for SNP data. If the `snpobj` parameter is 303 provided during instantiation, the `fit_transform` method will be automatically called, 304 applying PCA to transform the data according to the selected configuration. 305 """ 306 def __init__( 307 self, 308 snpobj: Optional['SNPObject'] = None, 309 backend: str = 'pytorch', 310 n_components: int = 2, 311 fitting: str = 'full', 312 device: str = 'cpu', 313 average_strands: bool = True, 314 samples_subset: Optional[Union[int, List]] = None, 315 snps_subset: Optional[Union[int, List]] = None 316 ): 317 """ 318 Args: 319 snpobj (SNPObject, optional): 320 A SNPObject instance. 321 backend (str, default='pytorch'): 322 The backend to use (`'sklearn'` or `'pytorch'`). Default is 'pytorch'. 323 n_components (int, default=2): 324 The number of principal components. Default is 2. 325 fitting (str, default='full'): 326 The fitting approach to use for the SVD computation (only for `backend='pytorch'`). 327 - `'full'`: Full Singular Value Decomposition (SVD). 328 - `'lowrank'`: Low-rank approximation, which provides a faster but approximate solution. 329 Default is 'full'. 330 device (str, default='cpu'): 331 Device to use (`'cpu'`, `'gpu'`, `'cuda'`, or `'cuda:<index>'`). Default is 'cpu'. 332 average_strands (bool, default=True): 333 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 334 samples_subset (int or list of int, optional): 335 Subset of samples to include, as an integer for the first samples or a list of sample indices. 336 snps_subset (int or list of int, optional): 337 Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices. 338 """ 339 self.__snpobj = snpobj 340 self.__backend = backend 341 self.__n_components = n_components 342 self.__fitting = fitting 343 self.__device = self._process_device_argument(device) 344 self.__average_strands = average_strands 345 self.__samples_subset = samples_subset 346 self.__snps_subset = snps_subset 347 self.__X_ = None 348 self.__X_new_ = None # Store transformed SNP data 349 self.__n_components_ = None 350 self.__components_ = None 351 self.__mean_ = None 352 353 # Initialize PCA backend 354 if self.backend == "pytorch": 355 self.pca = TorchPCA(n_components=self.n_components, fitting=self.fitting) 356 elif self.backend == "sklearn": 357 self.pca = skPCA(n_components=self.n_components) 358 else: 359 raise ValueError("Unknown backend for PCA: ", backend) 360 361 # Fit and transform if a `snpobj` is provided 362 if self.snpobj is not None: 363 self.fit_transform(snpobj) 364 365 @property 366 def snpobj(self) -> Optional['SNPObject']: 367 """ 368 Retrieve `snpobj`. 369 370 Returns: 371 **SNPObject:** A SNPObject instance. 372 """ 373 return self.__snpobj 374 375 @snpobj.setter 376 def snpobj(self, x) -> None: 377 """ 378 Update `snpobj`. 379 """ 380 self.__snpobj = x 381 382 @property 383 def backend(self) -> str: 384 """ 385 Retrieve `backend`. 386 387 Returns: 388 **str:** The backend to use (`'sklearn'` or `'pytorch'`). 389 """ 390 return self.__backend 391 392 @backend.setter 393 def backend(self, x: str) -> None: 394 """ 395 Update `backend`. 396 """ 397 self.__backend = x 398 399 @property 400 def n_components(self) -> int: 401 """ 402 Retrieve `n_components`. 403 404 Returns: 405 **int:** The number of principal components. 406 """ 407 return self.__n_components 408 409 @n_components.setter 410 def n_components(self, x: int) -> None: 411 """ 412 Update `n_components`. 413 """ 414 self.__n_components = x 415 416 @property 417 def fitting(self) -> str: 418 """ 419 Retrieve `fitting`. 420 421 Returns: 422 **str:** 423 The fitting approach to use for the SVD computation (only for `backend='pytorch'`). 424 - `'full'`: Full Singular Value Decomposition (SVD). 425 - `'lowrank'`: Low-rank approximation, which provides a faster but approximate solution. 426 """ 427 return self.__fitting 428 429 @fitting.setter 430 def fitting(self, x: str) -> None: 431 """ 432 Update `fitting`. 433 """ 434 self.__fitting = x 435 436 @property 437 def device(self) -> torch.device: 438 """ 439 Retrieve `device`. 440 441 Returns: 442 **torch.device:** Device to use (`'cpu'`, `'gpu'`, `'cuda'`, or `'cuda:<index>'`). 443 """ 444 return self.__device 445 446 @device.setter 447 def device(self, x: str) -> None: 448 """ 449 Update `device`. 450 """ 451 self.__device = self._process_device_argument(x) 452 453 @property 454 def average_strands(self) -> bool: 455 """ 456 Retrieve `average_strands`. 457 458 Returns: 459 **bool:** 460 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 461 """ 462 return self.__average_strands 463 464 @average_strands.setter 465 def average_strands(self, x: bool) -> None: 466 """ 467 Update `average_strands`. 468 """ 469 self.__average_strands = x 470 471 @property 472 def samples_subset(self) -> Optional[Union[int, List[int]]]: 473 """ 474 Retrieve `samples_subset`. 475 476 Returns: 477 **int or list of int:** 478 Subset of samples to include, as an integer for the first samples or a list of sample indices. 479 """ 480 return self.__samples_subset 481 482 @samples_subset.setter 483 def samples_subset(self, x: Optional[Union[int, List[int]]]) -> None: 484 """ 485 Update `samples_subset`. 486 """ 487 self.__samples_subset = x 488 489 @property 490 def snps_subset(self) -> Optional[Union[int, List[int]]]: 491 """ 492 Retrieve `snps_subset`. 493 494 Returns: 495 **int or list of int:** Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices. 496 """ 497 return self.__snps_subset 498 499 @snps_subset.setter 500 def snps_subset(self, x: Optional[Union[int, List[int]]]) -> None: 501 """ 502 Update `snps_subset`. 503 """ 504 self.__snps_subset = x 505 506 @property 507 def n_components_(self) -> Optional[int]: 508 """ 509 Retrieve `n_components_`. 510 511 Returns: 512 **int:** 513 The effective number of components retained after fitting, 514 calculated as `min(self.n_components, min(n_samples, n_snps))`. 515 """ 516 return self.__n_components_ 517 518 @n_components_.setter 519 def n_components_(self, x: int) -> None: 520 """ 521 Update `n_components_`. 522 """ 523 self.__n_components_ = x 524 525 @property 526 def components_(self) -> Optional[Union[torch.Tensor, np.ndarray]]: 527 """ 528 Retrieve `components_`. 529 530 Returns: 531 **tensor or array of shape (n_components_, n_snps):** 532 Matrix of principal components, where each row is a principal component vector. 533 """ 534 return self.__components_ 535 536 @components_.setter 537 def components_(self, x: Union[torch.Tensor, np.ndarray]) -> None: 538 """ 539 Update `components_`. 540 """ 541 self.__components_ = x 542 543 @property 544 def mean_(self) -> Optional[Union[torch.Tensor, np.ndarray]]: 545 """ 546 Retrieve `mean_`. 547 548 Returns: 549 **tensor or array of shape (n_snps,):** 550 Per-feature mean vector of the input data used for centering. 551 """ 552 return self.__mean_ 553 554 @mean_.setter 555 def mean_(self, x: Union[torch.Tensor, np.ndarray]) -> None: 556 """ 557 Update `mean_`. 558 """ 559 self.__mean_ = x 560 561 @property 562 def X_(self) -> Optional[Union[torch.Tensor, np.ndarray]]: 563 """ 564 Retrieve `X_`. 565 566 Returns: 567 **tensor or array of shape (n_samples, n_snps):** 568 The SNP data matrix used to fit the model. 569 """ 570 return self.__X_ 571 572 @X_.setter 573 def X_(self, x: Union[torch.Tensor, np.ndarray]) -> None: 574 """ 575 Update `X_`. 576 """ 577 self.__X_ = x 578 579 @property 580 def X_new_(self) -> Optional[Union[torch.Tensor, np.ndarray]]: 581 """ 582 Retrieve `X_new_`. 583 584 Returns: 585 **tensor or array of shape (n_samples, n_components_):** 586 The transformed SNP data projected onto the `n_components_` principal components. 587 """ 588 return self.__X_new_ 589 590 @X_new_.setter 591 def X_new_(self, x: Union[torch.Tensor, np.ndarray]) -> None: 592 """ 593 Update `X_new_`. 594 """ 595 self.__X_new_ = x 596 597 def copy(self) -> 'PCA': 598 """ 599 Create and return a copy of `self`. 600 601 Returns: 602 **PCA:** 603 A new instance of the current object. 604 """ 605 return copy.copy(self) 606 607 def _process_device_argument(self, device: str): 608 """ 609 Process the device argument to map user-friendly device names to PyTorch device specifications. 610 611 Args: 612 device (str): Device specified by the user. 613 614 Returns: 615 torch.device: PyTorch device object. 616 """ 617 if isinstance(device, str): 618 device_lower = device.lower() 619 if device_lower in ['cpu']: 620 return torch.device('cpu') 621 elif device_lower in ['gpu', 'cuda']: 622 if torch.cuda.is_available(): 623 return torch.device('cuda') 624 else: 625 warnings.warn("CUDA is not available; using CPU instead.") 626 return torch.device('cpu') 627 elif device_lower.startswith('cuda:'): 628 if torch.cuda.is_available(): 629 return torch.device(device_lower) 630 else: 631 warnings.warn(f"CUDA is not available; requested device '{device}' is not available. Using CPU instead.") 632 return torch.device('cpu') 633 else: 634 raise ValueError(f"Unknown device type: '{device}'. Please use 'CPU', 'GPU', 'cuda', or 'cuda:<index>'.") 635 elif isinstance(device, torch.device): 636 return device 637 else: 638 raise TypeError(f"Device must be a string or torch.device, got {type(device)}.") 639 640 def _get_data_from_snpobj( 641 self, 642 snpobj: Optional['SNPObject'] = None, 643 average_strands: Optional[bool] = None, 644 samples_subset: Optional[Union[int, List]] = None, 645 snps_subset: Optional[Union[int, List]] = None 646 ) -> Union[np.ndarray, torch.Tensor]: 647 """ 648 Retrieve and prepare SNP data for PCA analysis, with options for selecting subsets and handling strands. 649 650 This method processes SNP data stored in an `SNPObject`, which may include averaging of paternal 651 and maternal strands or selecting subsets of samples and SNPs. The prepared data is formatted 652 for use in PCA, optionally converted to a PyTorch tensor if the backend is set to 'pytorch'. 653 654 Args: 655 snpobj (SNPObject, optional): 656 A SNPObject object instance. If None, defaults to `self.snpobj`. 657 average_strands (bool, optional): 658 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 659 If None, defaults to `self.average_strands`. 660 samples_subset (int or list of int, optional): 661 Subset of samples to include, as an integer for the first samples or a list of sample indices. 662 If None, defaults to `self.samples_subset`. 663 snps_subset (int or list of int, optional): 664 Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices. 665 If None, defaults to `self.snps_subset`. 666 667 Returns: 668 numpy.ndarray or torch.Tensor: 669 The processed SNP data. If `backend` is set to 'pytorch', the data is returned as a 670 PyTorch tensor on the specified `device`. 671 """ 672 if snpobj is None: 673 snpobj = self.snpobj 674 if average_strands is None: 675 average_strands = self.average_strands 676 if samples_subset is None: 677 samples_subset = self.samples_subset 678 if snps_subset is None: 679 snps_subset = self.snps_subset 680 681 if snpobj.calldata_gt.ndim == 2: 682 X = np.transpose(snpobj.calldata_gt.astype(float), (1,0)) 683 elif snpobj.calldata_gt.ndim == 3: 684 X = np.transpose(snpobj.calldata_gt.astype(float), (1,0,2)) 685 686 if average_strands: 687 X = np.mean(X, axis=2) 688 else: 689 X = np.reshape(X, (-1, X.shape[1])) 690 else: 691 raise ValueError(f"Invalid shape for `calldata_gt`: expected a 2D or 3D array, but got {snpobj.calldata_gt.ndim}D array.") 692 693 # Handle sample and SNP subsets 694 if isinstance(samples_subset, int): 695 X = X[:samples_subset] 696 elif isinstance(samples_subset, list): 697 X = X[samples_subset] 698 699 if isinstance(snps_subset, int): 700 X = X[:, :snps_subset] 701 elif isinstance(snps_subset, list): 702 X = X[:, snps_subset] 703 704 if self.backend == "pytorch": 705 print(f"Converting data to PyTorch tensor on device {self.device}") 706 X = torch.from_numpy(X).to(self.device) 707 708 return X 709 710 def fit( 711 self, 712 snpobj: Optional['SNPObject'] = None, 713 average_strands: Optional[bool] = None, 714 samples_subset: Optional[Union[int, List]] = None, 715 snps_subset: Optional[Union[int, List]] = None 716 ) -> 'PCA': 717 """ 718 Fit the model to the input SNP data stored in the provided `snpobj`. 719 720 Args: 721 snpobj (SNPObject, optional): 722 A SNPObject instance. If None, defaults to `self.snpobj`. 723 average_strands (bool, optional): 724 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 725 If None, defaults to `self.average_strands`. 726 samples_subset (int or list of int, optional): 727 Subset of samples to include, as an integer for the first samples or a list of sample indices. 728 If None, defaults to `self.samples_subset`. 729 snps_subset (int or list of int, optional): 730 Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices. 731 If None, defaults to `self.snps_subset`. 732 733 Returns: 734 **PCA:** 735 The fitted instance of `self`. 736 """ 737 self.X_ = self._get_data_from_snpobj(snpobj, average_strands, samples_subset, snps_subset) 738 self.pca.fit(self.X_) 739 740 # Update attributes based on the fitted model 741 self.n_components_ = self.pca.n_components_ 742 self.components_ = self.pca.components_ 743 self.mean_ = self.pca.mean_ 744 745 return self 746 747 def transform( 748 self, 749 snpobj: Optional['SNPObject'] = None, 750 average_strands: Optional[bool] = None, 751 samples_subset: Optional[Union[int, List]] = None, 752 snps_subset: Optional[Union[int, List]] = None 753 ): 754 """ 755 Apply dimensionality reduction to the input SNP data stored in the provided `snpobj` using the fitted model. 756 757 Args: 758 snpobj (SNPObject, optional): 759 A SNPObject instance. If None, defaults to `self.snpobj`. 760 average_strands (bool, optional): 761 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 762 If None, defaults to `self.average_strands`. 763 samples_subset (int or list of int, optional): 764 Subset of samples to include, as an integer for the first samples or a list of sample indices. 765 If None, defaults to `self.samples_subset`. 766 snps_subset (int or list of int, optional): 767 Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices. 768 If None, defaults to `self.snps_subset`. 769 770 Returns: 771 **tensor or array of shape (n_samples, n_components):** 772 The transformed SNP data projected onto the `n_components_` principal components, 773 stored in `self.X_new_`. 774 """ 775 # Retrieve or update the data to transform 776 if snpobj is not None or self.X_ is None: 777 self.X_ = self._get_data_from_snpobj(snpobj, average_strands, samples_subset, snps_subset) 778 779 # Apply transformation using the fitted PCA model 780 return self.pca.transform(self.X_) 781 782 def fit_transform(self, snpobj: Optional['SNPObject'] = None, average_strands: Optional[bool] = None, 783 samples_subset: Optional[Union[int, List]] = None, snps_subset: Optional[Union[int, List]] = None): 784 """ 785 Fit the model to the SNP data stored in the provided `snpobj` and apply the dimensionality reduction 786 on the same SNP data. 787 788 Args: 789 snpobj (SNPObject, optional): 790 A SNPObject instance. If None, defaults to `self.snpobj`. 791 average_strands (bool, optional): 792 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 793 If None, defaults to `self.average_strands`. 794 samples_subset (int or list of int, optional): 795 Subset of samples to include, as an integer for the first samples or a list of sample indices. 796 If None, defaults to `self.samples_subset`. 797 snps_subset (int or list of int, optional): 798 Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices. 799 If None, defaults to `self.snps_subset`. 800 801 Returns: 802 **tensor or array of shape (n_samples, n_components):** 803 The transformed SNP data projected onto the `n_components_` principal components, 804 stored in `self.X_new_`. 805 """ 806 self.X_ = self._get_data_from_snpobj(snpobj, average_strands, samples_subset, snps_subset) 807 self.X_new_ = self.pca.fit_transform(self.X_) 808 809 # Update attributes based on the fitted model 810 self.n_components_ = self.pca.n_components_ 811 self.components_ = self.pca.components_ 812 self.mean_ = self.pca.mean_ 813 814 return self.X_new_
35class TorchPCA: 36 """ 37 A class for GPU-based Principal Component Analysis (PCA) with PyTorch tensors. 38 39 This implementation leverages GPU acceleration to achieve significant performance improvements, 40 being up to 25 times faster than `sklearn.decomposition.PCA` when running on a compatible GPU. 41 """ 42 def __init__(self, n_components: int = 2, fitting: str = 'reduced'): 43 """ 44 Args: 45 n_components (int, default=2): 46 The number of principal components. If None, defaults to the minimum of `n_samples` and `n_snps`. 47 fitting (str, default='reduced'): 48 The fitting approach for the SVD computation. Options: 49 - `'full'`: Full Singular Value Decomposition (SVD). 50 - `'reduced'`: Economy SVD. 51 - `'lowrank'`: Low-rank approximation, which provides a faster but approximate solution. 52 Default is 'reduced'. 53 """ 54 self.__n_components = n_components 55 self.__fitting = fitting 56 self.__n_components_ = None 57 self.__components_ = None 58 self.__mean_ = None 59 self.__X_new_ = None # Store transformed SNP data 60 61 def __getitem__(self, key): 62 """ 63 To access an attribute of the class using the square bracket notation, 64 similar to a dictionary. 65 """ 66 try: 67 return getattr(self, key) 68 except AttributeError: 69 raise KeyError(f'Invalid key: {key}') 70 71 def __setitem__(self, key, value): 72 """ 73 To set an attribute of the class using the square bracket notation, 74 similar to a dictionary. 75 """ 76 try: 77 setattr(self, key, value) 78 except AttributeError: 79 raise KeyError(f'Invalid key: {key}') 80 81 @property 82 def n_components(self) -> Optional[int]: 83 """ 84 Retrieve `n_components`. 85 86 Returns: 87 **int:** The number of principal components. 88 """ 89 return self.__n_components 90 91 @n_components.setter 92 def n_components(self, x: torch.Tensor) -> None: 93 """ 94 Update `n_components`. 95 """ 96 self.__n_components = x 97 98 @property 99 def fitting(self) -> str: 100 """ 101 Retrieve `fitting`. 102 103 Returns: 104 **str:** 105 The fitting approach for the SVD computation. 106 """ 107 return self.__fitting 108 109 @fitting.setter 110 def fitting(self, x: torch.Tensor) -> None: 111 """ 112 Update `fitting`. 113 """ 114 self.__fitting = x 115 116 @property 117 def n_components_(self) -> Optional[int]: 118 """ 119 Retrieve `n_components_`. 120 121 Returns: 122 **int:** 123 The effective number of components retained after fitting, 124 calculated as `min(self.n_components, min(n_samples, n_snps))`. 125 """ 126 return self.__n_components_ 127 128 @n_components_.setter 129 def n_components_(self, x: int) -> None: 130 """ 131 Update `n_components_`. 132 """ 133 self.__n_components_ = x 134 135 @property 136 def components_(self) -> Optional[torch.Tensor]: 137 """ 138 Retrieve `components_`. 139 140 Returns: 141 **tensor of shape (n_components_, n_snps):** 142 Matrix of principal components, where each row is a principal component vector. 143 """ 144 return self.__components_ 145 146 @components_.setter 147 def components_(self, x: torch.Tensor) -> None: 148 """ 149 Update `components_`. 150 """ 151 self.__components_ = x 152 153 @property 154 def mean_(self) -> Optional[torch.Tensor]: 155 """ 156 Retrieve `mean_`. 157 158 Returns: 159 **tensor of shape (n_snps,):** 160 Per-feature mean vector of the input data used for centering. 161 """ 162 return self.__mean_ 163 164 @mean_.setter 165 def mean_(self, x: torch.Tensor) -> None: 166 """ 167 Update `mean_`. 168 """ 169 self.__mean_ = x 170 171 @property 172 def X_new_(self) -> Optional[Union[torch.Tensor, np.ndarray]]: 173 """ 174 Retrieve `X_new_`. 175 176 Returns: 177 **tensor of shape (n_samples, n_components_):** 178 The transformed SNP data projected onto the `n_components_` principal components. 179 """ 180 return self.__X_new_ 181 182 @X_new_.setter 183 def X_new_(self, x: torch.Tensor) -> None: 184 """ 185 Update `X_new_`. 186 """ 187 self.__X_new_ = x 188 189 def copy(self) -> 'TorchPCA': 190 """ 191 Create and return a copy of `self`. 192 193 Returns: 194 **TorchPCA:** 195 A new instance of the current object. 196 """ 197 return copy.copy(self) 198 199 def _fit(self, X: torch.Tensor) -> Tuple: 200 """ 201 Internal method to fit the PCA model to the data `X`. 202 203 Args: 204 X (tensor of shape (n_samples, n_snps)): 205 Input SNP data used for fitting the model. 206 207 Returns: 208 Tuple: U, S, and Vt matrices from the SVD, where: 209 - `U` has shape (n_samples, n_components). 210 - `S` contains singular values (n_components). 211 - `Vt` has shape (n_components, n_snps) and represents the principal components. 212 """ 213 n_samples, n_snps = X.shape 214 self.n_components_ = min(self.n_components or min(X.shape), min(X.shape)) 215 216 if self.n_components_ > min(X.shape): 217 raise ValueError(f"n_components should be <= min(n_samples: {n_samples}, n_snps: {n_snps})") 218 219 # Compute the mean to center the data 220 self.mean_ = torch.mean(X, dim=0) 221 222 # Choose SVD method based on the fitting type 223 if self.fitting == "full": 224 U, S, Vt = torch.linalg.svd(X - self.mean_, full_matrices=True) 225 elif self.fitting == "reduced": 226 U, S, Vt = torch.linalg.svd(X - self.mean_, full_matrices=False) 227 elif self.fitting == "lowrank": 228 U, S, V = torch.svd_lowrank(X, q=self.n_components_, M=self.mean_) 229 Vt = V.mT # Transpose V to get Vt 230 else: 231 raise ValueError(f"Unrecognized fitting method: {self.fitting}") 232 233 # Select the first `n_components` columns and singular values 234 U = U[:, :self.n_components_] 235 S = S[:self.n_components_] 236 Vt = Vt[:self.n_components_] 237 238 # Ensure deterministic output for U and Vt signs 239 U, Vt = _svd_flip(U, Vt) 240 self.components_ = Vt 241 242 return U, S, Vt 243 244 def fit(self, X: torch.Tensor) -> 'TorchPCA': 245 """ 246 Fit the model to the input SNP data. 247 248 Args: 249 X (tensor of shape (n_samples, n_snps)): 250 The SNP data matrix to fit the model. 251 252 Returns: 253 **TorchPCA:** 254 The fitted instance of `self`. 255 """ 256 self._fit(X) 257 return self 258 259 def transform(self, X: torch.Tensor) -> torch.Tensor: 260 """ 261 Apply dimensionality reduction to the input SNP data using the fitted model. 262 263 Args: 264 X (tensor of shape (n_samples, n_snps)): 265 The SNP data matrix to be transformed. 266 267 Returns: 268 **tensor of shape (n_samples, n_components_):** 269 The transformed SNP data projected onto the `n_components_` principal components, 270 stored in `self.X_new_`. 271 """ 272 if self.components_ is None or self.mean_ is None: 273 raise ValueError("The PCA model must be fitted before calling `transform`.") 274 275 self.X_new_ = torch.matmul(X - self.mean_, self.components_.T) 276 return self.X_new_ 277 278 def fit_transform(self, X): 279 """ 280 Fit the model to the SNP data and apply dimensionality reduction on the same SNP data. 281 282 Args: 283 X (tensor of shape n_samples, n_snps): 284 The SNP data matrix used for both fitting and transformation. 285 286 Returns: 287 **tensor of shape (n_samples, n_components_):** 288 The transformed SNP data projected onto the `n_components_` principal components, 289 stored in `self.X_new_`. 290 """ 291 U, S, _ = self._fit(X) 292 self.X_new_ = U * S.unsqueeze(0) 293 return self.X_new_
A class for GPU-based Principal Component Analysis (PCA) with PyTorch tensors.
This implementation leverages GPU acceleration to achieve significant performance improvements,
being up to 25 times faster than sklearn.decomposition.PCA
when running on a compatible GPU.
42 def __init__(self, n_components: int = 2, fitting: str = 'reduced'): 43 """ 44 Args: 45 n_components (int, default=2): 46 The number of principal components. If None, defaults to the minimum of `n_samples` and `n_snps`. 47 fitting (str, default='reduced'): 48 The fitting approach for the SVD computation. Options: 49 - `'full'`: Full Singular Value Decomposition (SVD). 50 - `'reduced'`: Economy SVD. 51 - `'lowrank'`: Low-rank approximation, which provides a faster but approximate solution. 52 Default is 'reduced'. 53 """ 54 self.__n_components = n_components 55 self.__fitting = fitting 56 self.__n_components_ = None 57 self.__components_ = None 58 self.__mean_ = None 59 self.__X_new_ = None # Store transformed SNP data
Arguments:
- n_components (int, default=2): The number of principal components. If None, defaults to the minimum of
n_samples
andn_snps
. - fitting (str, default='reduced'): The fitting approach for the SVD computation. Options:
'full'
: Full Singular Value Decomposition (SVD).'reduced'
: Economy SVD.'lowrank'
: Low-rank approximation, which provides a faster but approximate solution. Default is 'reduced'.
116 @property 117 def n_components_(self) -> Optional[int]: 118 """ 119 Retrieve `n_components_`. 120 121 Returns: 122 **int:** 123 The effective number of components retained after fitting, 124 calculated as `min(self.n_components, min(n_samples, n_snps))`. 125 """ 126 return self.__n_components_
Retrieve n_components_
.
Returns:
int: The effective number of components retained after fitting, calculated as
min(self.n_components, min(n_samples, n_snps))
.
135 @property 136 def components_(self) -> Optional[torch.Tensor]: 137 """ 138 Retrieve `components_`. 139 140 Returns: 141 **tensor of shape (n_components_, n_snps):** 142 Matrix of principal components, where each row is a principal component vector. 143 """ 144 return self.__components_
Retrieve components_
.
Returns:
tensor of shape (n_components_, n_snps): Matrix of principal components, where each row is a principal component vector.
153 @property 154 def mean_(self) -> Optional[torch.Tensor]: 155 """ 156 Retrieve `mean_`. 157 158 Returns: 159 **tensor of shape (n_snps,):** 160 Per-feature mean vector of the input data used for centering. 161 """ 162 return self.__mean_
Retrieve mean_
.
Returns:
tensor of shape (n_snps,): Per-feature mean vector of the input data used for centering.
171 @property 172 def X_new_(self) -> Optional[Union[torch.Tensor, np.ndarray]]: 173 """ 174 Retrieve `X_new_`. 175 176 Returns: 177 **tensor of shape (n_samples, n_components_):** 178 The transformed SNP data projected onto the `n_components_` principal components. 179 """ 180 return self.__X_new_
Retrieve X_new_
.
Returns:
tensor of shape (n_samples, n_components_): The transformed SNP data projected onto the
n_components_
principal components.
189 def copy(self) -> 'TorchPCA': 190 """ 191 Create and return a copy of `self`. 192 193 Returns: 194 **TorchPCA:** 195 A new instance of the current object. 196 """ 197 return copy.copy(self)
Create and return a copy of self
.
Returns:
TorchPCA: A new instance of the current object.
244 def fit(self, X: torch.Tensor) -> 'TorchPCA': 245 """ 246 Fit the model to the input SNP data. 247 248 Args: 249 X (tensor of shape (n_samples, n_snps)): 250 The SNP data matrix to fit the model. 251 252 Returns: 253 **TorchPCA:** 254 The fitted instance of `self`. 255 """ 256 self._fit(X) 257 return self
Fit the model to the input SNP data.
Arguments:
- X (tensor of shape (n_samples, n_snps)): The SNP data matrix to fit the model.
Returns:
TorchPCA: The fitted instance of
self
.
259 def transform(self, X: torch.Tensor) -> torch.Tensor: 260 """ 261 Apply dimensionality reduction to the input SNP data using the fitted model. 262 263 Args: 264 X (tensor of shape (n_samples, n_snps)): 265 The SNP data matrix to be transformed. 266 267 Returns: 268 **tensor of shape (n_samples, n_components_):** 269 The transformed SNP data projected onto the `n_components_` principal components, 270 stored in `self.X_new_`. 271 """ 272 if self.components_ is None or self.mean_ is None: 273 raise ValueError("The PCA model must be fitted before calling `transform`.") 274 275 self.X_new_ = torch.matmul(X - self.mean_, self.components_.T) 276 return self.X_new_
Apply dimensionality reduction to the input SNP data using the fitted model.
Arguments:
- X (tensor of shape (n_samples, n_snps)): The SNP data matrix to be transformed.
Returns:
tensor of shape (n_samples, n_components_): The transformed SNP data projected onto the
n_components_
principal components, stored inself.X_new_
.
278 def fit_transform(self, X): 279 """ 280 Fit the model to the SNP data and apply dimensionality reduction on the same SNP data. 281 282 Args: 283 X (tensor of shape n_samples, n_snps): 284 The SNP data matrix used for both fitting and transformation. 285 286 Returns: 287 **tensor of shape (n_samples, n_components_):** 288 The transformed SNP data projected onto the `n_components_` principal components, 289 stored in `self.X_new_`. 290 """ 291 U, S, _ = self._fit(X) 292 self.X_new_ = U * S.unsqueeze(0) 293 return self.X_new_
Fit the model to the SNP data and apply dimensionality reduction on the same SNP data.
Arguments:
- X (tensor of shape n_samples, n_snps): The SNP data matrix used for both fitting and transformation.
Returns:
tensor of shape (n_samples, n_components_): The transformed SNP data projected onto the
n_components_
principal components, stored inself.X_new_
.
296class PCA: 297 """ 298 A class for Principal Component Analysis (PCA) on SNP data stored in a `snputils.snp.genobj.SNPObject`. 299 This class employs either 300 [sklearn.decomposition.PCA](https://scikit-learn.org/stable/modules/generated/sklearn.decomposition.PCA.html) 301 or the custom `TorchPCA` implementation for efficient GPU-accelerated analysis. 302 303 The PCA class supports both separate and averaged strand processing for SNP data. If the `snpobj` parameter is 304 provided during instantiation, the `fit_transform` method will be automatically called, 305 applying PCA to transform the data according to the selected configuration. 306 """ 307 def __init__( 308 self, 309 snpobj: Optional['SNPObject'] = None, 310 backend: str = 'pytorch', 311 n_components: int = 2, 312 fitting: str = 'full', 313 device: str = 'cpu', 314 average_strands: bool = True, 315 samples_subset: Optional[Union[int, List]] = None, 316 snps_subset: Optional[Union[int, List]] = None 317 ): 318 """ 319 Args: 320 snpobj (SNPObject, optional): 321 A SNPObject instance. 322 backend (str, default='pytorch'): 323 The backend to use (`'sklearn'` or `'pytorch'`). Default is 'pytorch'. 324 n_components (int, default=2): 325 The number of principal components. Default is 2. 326 fitting (str, default='full'): 327 The fitting approach to use for the SVD computation (only for `backend='pytorch'`). 328 - `'full'`: Full Singular Value Decomposition (SVD). 329 - `'lowrank'`: Low-rank approximation, which provides a faster but approximate solution. 330 Default is 'full'. 331 device (str, default='cpu'): 332 Device to use (`'cpu'`, `'gpu'`, `'cuda'`, or `'cuda:<index>'`). Default is 'cpu'. 333 average_strands (bool, default=True): 334 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 335 samples_subset (int or list of int, optional): 336 Subset of samples to include, as an integer for the first samples or a list of sample indices. 337 snps_subset (int or list of int, optional): 338 Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices. 339 """ 340 self.__snpobj = snpobj 341 self.__backend = backend 342 self.__n_components = n_components 343 self.__fitting = fitting 344 self.__device = self._process_device_argument(device) 345 self.__average_strands = average_strands 346 self.__samples_subset = samples_subset 347 self.__snps_subset = snps_subset 348 self.__X_ = None 349 self.__X_new_ = None # Store transformed SNP data 350 self.__n_components_ = None 351 self.__components_ = None 352 self.__mean_ = None 353 354 # Initialize PCA backend 355 if self.backend == "pytorch": 356 self.pca = TorchPCA(n_components=self.n_components, fitting=self.fitting) 357 elif self.backend == "sklearn": 358 self.pca = skPCA(n_components=self.n_components) 359 else: 360 raise ValueError("Unknown backend for PCA: ", backend) 361 362 # Fit and transform if a `snpobj` is provided 363 if self.snpobj is not None: 364 self.fit_transform(snpobj) 365 366 @property 367 def snpobj(self) -> Optional['SNPObject']: 368 """ 369 Retrieve `snpobj`. 370 371 Returns: 372 **SNPObject:** A SNPObject instance. 373 """ 374 return self.__snpobj 375 376 @snpobj.setter 377 def snpobj(self, x) -> None: 378 """ 379 Update `snpobj`. 380 """ 381 self.__snpobj = x 382 383 @property 384 def backend(self) -> str: 385 """ 386 Retrieve `backend`. 387 388 Returns: 389 **str:** The backend to use (`'sklearn'` or `'pytorch'`). 390 """ 391 return self.__backend 392 393 @backend.setter 394 def backend(self, x: str) -> None: 395 """ 396 Update `backend`. 397 """ 398 self.__backend = x 399 400 @property 401 def n_components(self) -> int: 402 """ 403 Retrieve `n_components`. 404 405 Returns: 406 **int:** The number of principal components. 407 """ 408 return self.__n_components 409 410 @n_components.setter 411 def n_components(self, x: int) -> None: 412 """ 413 Update `n_components`. 414 """ 415 self.__n_components = x 416 417 @property 418 def fitting(self) -> str: 419 """ 420 Retrieve `fitting`. 421 422 Returns: 423 **str:** 424 The fitting approach to use for the SVD computation (only for `backend='pytorch'`). 425 - `'full'`: Full Singular Value Decomposition (SVD). 426 - `'lowrank'`: Low-rank approximation, which provides a faster but approximate solution. 427 """ 428 return self.__fitting 429 430 @fitting.setter 431 def fitting(self, x: str) -> None: 432 """ 433 Update `fitting`. 434 """ 435 self.__fitting = x 436 437 @property 438 def device(self) -> torch.device: 439 """ 440 Retrieve `device`. 441 442 Returns: 443 **torch.device:** Device to use (`'cpu'`, `'gpu'`, `'cuda'`, or `'cuda:<index>'`). 444 """ 445 return self.__device 446 447 @device.setter 448 def device(self, x: str) -> None: 449 """ 450 Update `device`. 451 """ 452 self.__device = self._process_device_argument(x) 453 454 @property 455 def average_strands(self) -> bool: 456 """ 457 Retrieve `average_strands`. 458 459 Returns: 460 **bool:** 461 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 462 """ 463 return self.__average_strands 464 465 @average_strands.setter 466 def average_strands(self, x: bool) -> None: 467 """ 468 Update `average_strands`. 469 """ 470 self.__average_strands = x 471 472 @property 473 def samples_subset(self) -> Optional[Union[int, List[int]]]: 474 """ 475 Retrieve `samples_subset`. 476 477 Returns: 478 **int or list of int:** 479 Subset of samples to include, as an integer for the first samples or a list of sample indices. 480 """ 481 return self.__samples_subset 482 483 @samples_subset.setter 484 def samples_subset(self, x: Optional[Union[int, List[int]]]) -> None: 485 """ 486 Update `samples_subset`. 487 """ 488 self.__samples_subset = x 489 490 @property 491 def snps_subset(self) -> Optional[Union[int, List[int]]]: 492 """ 493 Retrieve `snps_subset`. 494 495 Returns: 496 **int or list of int:** Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices. 497 """ 498 return self.__snps_subset 499 500 @snps_subset.setter 501 def snps_subset(self, x: Optional[Union[int, List[int]]]) -> None: 502 """ 503 Update `snps_subset`. 504 """ 505 self.__snps_subset = x 506 507 @property 508 def n_components_(self) -> Optional[int]: 509 """ 510 Retrieve `n_components_`. 511 512 Returns: 513 **int:** 514 The effective number of components retained after fitting, 515 calculated as `min(self.n_components, min(n_samples, n_snps))`. 516 """ 517 return self.__n_components_ 518 519 @n_components_.setter 520 def n_components_(self, x: int) -> None: 521 """ 522 Update `n_components_`. 523 """ 524 self.__n_components_ = x 525 526 @property 527 def components_(self) -> Optional[Union[torch.Tensor, np.ndarray]]: 528 """ 529 Retrieve `components_`. 530 531 Returns: 532 **tensor or array of shape (n_components_, n_snps):** 533 Matrix of principal components, where each row is a principal component vector. 534 """ 535 return self.__components_ 536 537 @components_.setter 538 def components_(self, x: Union[torch.Tensor, np.ndarray]) -> None: 539 """ 540 Update `components_`. 541 """ 542 self.__components_ = x 543 544 @property 545 def mean_(self) -> Optional[Union[torch.Tensor, np.ndarray]]: 546 """ 547 Retrieve `mean_`. 548 549 Returns: 550 **tensor or array of shape (n_snps,):** 551 Per-feature mean vector of the input data used for centering. 552 """ 553 return self.__mean_ 554 555 @mean_.setter 556 def mean_(self, x: Union[torch.Tensor, np.ndarray]) -> None: 557 """ 558 Update `mean_`. 559 """ 560 self.__mean_ = x 561 562 @property 563 def X_(self) -> Optional[Union[torch.Tensor, np.ndarray]]: 564 """ 565 Retrieve `X_`. 566 567 Returns: 568 **tensor or array of shape (n_samples, n_snps):** 569 The SNP data matrix used to fit the model. 570 """ 571 return self.__X_ 572 573 @X_.setter 574 def X_(self, x: Union[torch.Tensor, np.ndarray]) -> None: 575 """ 576 Update `X_`. 577 """ 578 self.__X_ = x 579 580 @property 581 def X_new_(self) -> Optional[Union[torch.Tensor, np.ndarray]]: 582 """ 583 Retrieve `X_new_`. 584 585 Returns: 586 **tensor or array of shape (n_samples, n_components_):** 587 The transformed SNP data projected onto the `n_components_` principal components. 588 """ 589 return self.__X_new_ 590 591 @X_new_.setter 592 def X_new_(self, x: Union[torch.Tensor, np.ndarray]) -> None: 593 """ 594 Update `X_new_`. 595 """ 596 self.__X_new_ = x 597 598 def copy(self) -> 'PCA': 599 """ 600 Create and return a copy of `self`. 601 602 Returns: 603 **PCA:** 604 A new instance of the current object. 605 """ 606 return copy.copy(self) 607 608 def _process_device_argument(self, device: str): 609 """ 610 Process the device argument to map user-friendly device names to PyTorch device specifications. 611 612 Args: 613 device (str): Device specified by the user. 614 615 Returns: 616 torch.device: PyTorch device object. 617 """ 618 if isinstance(device, str): 619 device_lower = device.lower() 620 if device_lower in ['cpu']: 621 return torch.device('cpu') 622 elif device_lower in ['gpu', 'cuda']: 623 if torch.cuda.is_available(): 624 return torch.device('cuda') 625 else: 626 warnings.warn("CUDA is not available; using CPU instead.") 627 return torch.device('cpu') 628 elif device_lower.startswith('cuda:'): 629 if torch.cuda.is_available(): 630 return torch.device(device_lower) 631 else: 632 warnings.warn(f"CUDA is not available; requested device '{device}' is not available. Using CPU instead.") 633 return torch.device('cpu') 634 else: 635 raise ValueError(f"Unknown device type: '{device}'. Please use 'CPU', 'GPU', 'cuda', or 'cuda:<index>'.") 636 elif isinstance(device, torch.device): 637 return device 638 else: 639 raise TypeError(f"Device must be a string or torch.device, got {type(device)}.") 640 641 def _get_data_from_snpobj( 642 self, 643 snpobj: Optional['SNPObject'] = None, 644 average_strands: Optional[bool] = None, 645 samples_subset: Optional[Union[int, List]] = None, 646 snps_subset: Optional[Union[int, List]] = None 647 ) -> Union[np.ndarray, torch.Tensor]: 648 """ 649 Retrieve and prepare SNP data for PCA analysis, with options for selecting subsets and handling strands. 650 651 This method processes SNP data stored in an `SNPObject`, which may include averaging of paternal 652 and maternal strands or selecting subsets of samples and SNPs. The prepared data is formatted 653 for use in PCA, optionally converted to a PyTorch tensor if the backend is set to 'pytorch'. 654 655 Args: 656 snpobj (SNPObject, optional): 657 A SNPObject object instance. If None, defaults to `self.snpobj`. 658 average_strands (bool, optional): 659 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 660 If None, defaults to `self.average_strands`. 661 samples_subset (int or list of int, optional): 662 Subset of samples to include, as an integer for the first samples or a list of sample indices. 663 If None, defaults to `self.samples_subset`. 664 snps_subset (int or list of int, optional): 665 Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices. 666 If None, defaults to `self.snps_subset`. 667 668 Returns: 669 numpy.ndarray or torch.Tensor: 670 The processed SNP data. If `backend` is set to 'pytorch', the data is returned as a 671 PyTorch tensor on the specified `device`. 672 """ 673 if snpobj is None: 674 snpobj = self.snpobj 675 if average_strands is None: 676 average_strands = self.average_strands 677 if samples_subset is None: 678 samples_subset = self.samples_subset 679 if snps_subset is None: 680 snps_subset = self.snps_subset 681 682 if snpobj.calldata_gt.ndim == 2: 683 X = np.transpose(snpobj.calldata_gt.astype(float), (1,0)) 684 elif snpobj.calldata_gt.ndim == 3: 685 X = np.transpose(snpobj.calldata_gt.astype(float), (1,0,2)) 686 687 if average_strands: 688 X = np.mean(X, axis=2) 689 else: 690 X = np.reshape(X, (-1, X.shape[1])) 691 else: 692 raise ValueError(f"Invalid shape for `calldata_gt`: expected a 2D or 3D array, but got {snpobj.calldata_gt.ndim}D array.") 693 694 # Handle sample and SNP subsets 695 if isinstance(samples_subset, int): 696 X = X[:samples_subset] 697 elif isinstance(samples_subset, list): 698 X = X[samples_subset] 699 700 if isinstance(snps_subset, int): 701 X = X[:, :snps_subset] 702 elif isinstance(snps_subset, list): 703 X = X[:, snps_subset] 704 705 if self.backend == "pytorch": 706 print(f"Converting data to PyTorch tensor on device {self.device}") 707 X = torch.from_numpy(X).to(self.device) 708 709 return X 710 711 def fit( 712 self, 713 snpobj: Optional['SNPObject'] = None, 714 average_strands: Optional[bool] = None, 715 samples_subset: Optional[Union[int, List]] = None, 716 snps_subset: Optional[Union[int, List]] = None 717 ) -> 'PCA': 718 """ 719 Fit the model to the input SNP data stored in the provided `snpobj`. 720 721 Args: 722 snpobj (SNPObject, optional): 723 A SNPObject instance. If None, defaults to `self.snpobj`. 724 average_strands (bool, optional): 725 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 726 If None, defaults to `self.average_strands`. 727 samples_subset (int or list of int, optional): 728 Subset of samples to include, as an integer for the first samples or a list of sample indices. 729 If None, defaults to `self.samples_subset`. 730 snps_subset (int or list of int, optional): 731 Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices. 732 If None, defaults to `self.snps_subset`. 733 734 Returns: 735 **PCA:** 736 The fitted instance of `self`. 737 """ 738 self.X_ = self._get_data_from_snpobj(snpobj, average_strands, samples_subset, snps_subset) 739 self.pca.fit(self.X_) 740 741 # Update attributes based on the fitted model 742 self.n_components_ = self.pca.n_components_ 743 self.components_ = self.pca.components_ 744 self.mean_ = self.pca.mean_ 745 746 return self 747 748 def transform( 749 self, 750 snpobj: Optional['SNPObject'] = None, 751 average_strands: Optional[bool] = None, 752 samples_subset: Optional[Union[int, List]] = None, 753 snps_subset: Optional[Union[int, List]] = None 754 ): 755 """ 756 Apply dimensionality reduction to the input SNP data stored in the provided `snpobj` using the fitted model. 757 758 Args: 759 snpobj (SNPObject, optional): 760 A SNPObject instance. If None, defaults to `self.snpobj`. 761 average_strands (bool, optional): 762 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 763 If None, defaults to `self.average_strands`. 764 samples_subset (int or list of int, optional): 765 Subset of samples to include, as an integer for the first samples or a list of sample indices. 766 If None, defaults to `self.samples_subset`. 767 snps_subset (int or list of int, optional): 768 Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices. 769 If None, defaults to `self.snps_subset`. 770 771 Returns: 772 **tensor or array of shape (n_samples, n_components):** 773 The transformed SNP data projected onto the `n_components_` principal components, 774 stored in `self.X_new_`. 775 """ 776 # Retrieve or update the data to transform 777 if snpobj is not None or self.X_ is None: 778 self.X_ = self._get_data_from_snpobj(snpobj, average_strands, samples_subset, snps_subset) 779 780 # Apply transformation using the fitted PCA model 781 return self.pca.transform(self.X_) 782 783 def fit_transform(self, snpobj: Optional['SNPObject'] = None, average_strands: Optional[bool] = None, 784 samples_subset: Optional[Union[int, List]] = None, snps_subset: Optional[Union[int, List]] = None): 785 """ 786 Fit the model to the SNP data stored in the provided `snpobj` and apply the dimensionality reduction 787 on the same SNP data. 788 789 Args: 790 snpobj (SNPObject, optional): 791 A SNPObject instance. If None, defaults to `self.snpobj`. 792 average_strands (bool, optional): 793 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 794 If None, defaults to `self.average_strands`. 795 samples_subset (int or list of int, optional): 796 Subset of samples to include, as an integer for the first samples or a list of sample indices. 797 If None, defaults to `self.samples_subset`. 798 snps_subset (int or list of int, optional): 799 Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices. 800 If None, defaults to `self.snps_subset`. 801 802 Returns: 803 **tensor or array of shape (n_samples, n_components):** 804 The transformed SNP data projected onto the `n_components_` principal components, 805 stored in `self.X_new_`. 806 """ 807 self.X_ = self._get_data_from_snpobj(snpobj, average_strands, samples_subset, snps_subset) 808 self.X_new_ = self.pca.fit_transform(self.X_) 809 810 # Update attributes based on the fitted model 811 self.n_components_ = self.pca.n_components_ 812 self.components_ = self.pca.components_ 813 self.mean_ = self.pca.mean_ 814 815 return self.X_new_
A class for Principal Component Analysis (PCA) on SNP data stored in a snputils.snp.genobj.SNPObject
.
This class employs either
sklearn.decomposition.PCA
or the custom TorchPCA
implementation for efficient GPU-accelerated analysis.
The PCA class supports both separate and averaged strand processing for SNP data. If the snpobj
parameter is
provided during instantiation, the fit_transform
method will be automatically called,
applying PCA to transform the data according to the selected configuration.
307 def __init__( 308 self, 309 snpobj: Optional['SNPObject'] = None, 310 backend: str = 'pytorch', 311 n_components: int = 2, 312 fitting: str = 'full', 313 device: str = 'cpu', 314 average_strands: bool = True, 315 samples_subset: Optional[Union[int, List]] = None, 316 snps_subset: Optional[Union[int, List]] = None 317 ): 318 """ 319 Args: 320 snpobj (SNPObject, optional): 321 A SNPObject instance. 322 backend (str, default='pytorch'): 323 The backend to use (`'sklearn'` or `'pytorch'`). Default is 'pytorch'. 324 n_components (int, default=2): 325 The number of principal components. Default is 2. 326 fitting (str, default='full'): 327 The fitting approach to use for the SVD computation (only for `backend='pytorch'`). 328 - `'full'`: Full Singular Value Decomposition (SVD). 329 - `'lowrank'`: Low-rank approximation, which provides a faster but approximate solution. 330 Default is 'full'. 331 device (str, default='cpu'): 332 Device to use (`'cpu'`, `'gpu'`, `'cuda'`, or `'cuda:<index>'`). Default is 'cpu'. 333 average_strands (bool, default=True): 334 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 335 samples_subset (int or list of int, optional): 336 Subset of samples to include, as an integer for the first samples or a list of sample indices. 337 snps_subset (int or list of int, optional): 338 Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices. 339 """ 340 self.__snpobj = snpobj 341 self.__backend = backend 342 self.__n_components = n_components 343 self.__fitting = fitting 344 self.__device = self._process_device_argument(device) 345 self.__average_strands = average_strands 346 self.__samples_subset = samples_subset 347 self.__snps_subset = snps_subset 348 self.__X_ = None 349 self.__X_new_ = None # Store transformed SNP data 350 self.__n_components_ = None 351 self.__components_ = None 352 self.__mean_ = None 353 354 # Initialize PCA backend 355 if self.backend == "pytorch": 356 self.pca = TorchPCA(n_components=self.n_components, fitting=self.fitting) 357 elif self.backend == "sklearn": 358 self.pca = skPCA(n_components=self.n_components) 359 else: 360 raise ValueError("Unknown backend for PCA: ", backend) 361 362 # Fit and transform if a `snpobj` is provided 363 if self.snpobj is not None: 364 self.fit_transform(snpobj)
Arguments:
- snpobj (SNPObject, optional): A SNPObject instance.
- backend (str, default='pytorch'): The backend to use (
'sklearn'
or'pytorch'
). Default is 'pytorch'. - n_components (int, default=2): The number of principal components. Default is 2.
- fitting (str, default='full'): The fitting approach to use for the SVD computation (only for
backend='pytorch'
).'full'
: Full Singular Value Decomposition (SVD).'lowrank'
: Low-rank approximation, which provides a faster but approximate solution. Default is 'full'.
- device (str, default='cpu'): Device to use (
'cpu'
,'gpu'
,'cuda'
, or'cuda:<index>'
). Default is 'cpu'. - average_strands (bool, default=True): True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
- samples_subset (int or list of int, optional): Subset of samples to include, as an integer for the first samples or a list of sample indices.
- snps_subset (int or list of int, optional): Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices.
417 @property 418 def fitting(self) -> str: 419 """ 420 Retrieve `fitting`. 421 422 Returns: 423 **str:** 424 The fitting approach to use for the SVD computation (only for `backend='pytorch'`). 425 - `'full'`: Full Singular Value Decomposition (SVD). 426 - `'lowrank'`: Low-rank approximation, which provides a faster but approximate solution. 427 """ 428 return self.__fitting
Retrieve fitting
.
Returns:
str: The fitting approach to use for the SVD computation (only for
backend='pytorch'
). -'full'
: Full Singular Value Decomposition (SVD). -'lowrank'
: Low-rank approximation, which provides a faster but approximate solution.
454 @property 455 def average_strands(self) -> bool: 456 """ 457 Retrieve `average_strands`. 458 459 Returns: 460 **bool:** 461 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 462 """ 463 return self.__average_strands
Retrieve average_strands
.
Returns:
bool: True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
472 @property 473 def samples_subset(self) -> Optional[Union[int, List[int]]]: 474 """ 475 Retrieve `samples_subset`. 476 477 Returns: 478 **int or list of int:** 479 Subset of samples to include, as an integer for the first samples or a list of sample indices. 480 """ 481 return self.__samples_subset
Retrieve samples_subset
.
Returns:
int or list of int: Subset of samples to include, as an integer for the first samples or a list of sample indices.
490 @property 491 def snps_subset(self) -> Optional[Union[int, List[int]]]: 492 """ 493 Retrieve `snps_subset`. 494 495 Returns: 496 **int or list of int:** Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices. 497 """ 498 return self.__snps_subset
Retrieve snps_subset
.
Returns:
int or list of int: Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices.
507 @property 508 def n_components_(self) -> Optional[int]: 509 """ 510 Retrieve `n_components_`. 511 512 Returns: 513 **int:** 514 The effective number of components retained after fitting, 515 calculated as `min(self.n_components, min(n_samples, n_snps))`. 516 """ 517 return self.__n_components_
Retrieve n_components_
.
Returns:
int: The effective number of components retained after fitting, calculated as
min(self.n_components, min(n_samples, n_snps))
.
526 @property 527 def components_(self) -> Optional[Union[torch.Tensor, np.ndarray]]: 528 """ 529 Retrieve `components_`. 530 531 Returns: 532 **tensor or array of shape (n_components_, n_snps):** 533 Matrix of principal components, where each row is a principal component vector. 534 """ 535 return self.__components_
Retrieve components_
.
Returns:
tensor or array of shape (n_components_, n_snps): Matrix of principal components, where each row is a principal component vector.
544 @property 545 def mean_(self) -> Optional[Union[torch.Tensor, np.ndarray]]: 546 """ 547 Retrieve `mean_`. 548 549 Returns: 550 **tensor or array of shape (n_snps,):** 551 Per-feature mean vector of the input data used for centering. 552 """ 553 return self.__mean_
Retrieve mean_
.
Returns:
tensor or array of shape (n_snps,): Per-feature mean vector of the input data used for centering.
562 @property 563 def X_(self) -> Optional[Union[torch.Tensor, np.ndarray]]: 564 """ 565 Retrieve `X_`. 566 567 Returns: 568 **tensor or array of shape (n_samples, n_snps):** 569 The SNP data matrix used to fit the model. 570 """ 571 return self.__X_
Retrieve X_
.
Returns:
tensor or array of shape (n_samples, n_snps): The SNP data matrix used to fit the model.
580 @property 581 def X_new_(self) -> Optional[Union[torch.Tensor, np.ndarray]]: 582 """ 583 Retrieve `X_new_`. 584 585 Returns: 586 **tensor or array of shape (n_samples, n_components_):** 587 The transformed SNP data projected onto the `n_components_` principal components. 588 """ 589 return self.__X_new_
Retrieve X_new_
.
Returns:
tensor or array of shape (n_samples, n_components_): The transformed SNP data projected onto the
n_components_
principal components.
598 def copy(self) -> 'PCA': 599 """ 600 Create and return a copy of `self`. 601 602 Returns: 603 **PCA:** 604 A new instance of the current object. 605 """ 606 return copy.copy(self)
Create and return a copy of self
.
Returns:
PCA: A new instance of the current object.
711 def fit( 712 self, 713 snpobj: Optional['SNPObject'] = None, 714 average_strands: Optional[bool] = None, 715 samples_subset: Optional[Union[int, List]] = None, 716 snps_subset: Optional[Union[int, List]] = None 717 ) -> 'PCA': 718 """ 719 Fit the model to the input SNP data stored in the provided `snpobj`. 720 721 Args: 722 snpobj (SNPObject, optional): 723 A SNPObject instance. If None, defaults to `self.snpobj`. 724 average_strands (bool, optional): 725 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 726 If None, defaults to `self.average_strands`. 727 samples_subset (int or list of int, optional): 728 Subset of samples to include, as an integer for the first samples or a list of sample indices. 729 If None, defaults to `self.samples_subset`. 730 snps_subset (int or list of int, optional): 731 Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices. 732 If None, defaults to `self.snps_subset`. 733 734 Returns: 735 **PCA:** 736 The fitted instance of `self`. 737 """ 738 self.X_ = self._get_data_from_snpobj(snpobj, average_strands, samples_subset, snps_subset) 739 self.pca.fit(self.X_) 740 741 # Update attributes based on the fitted model 742 self.n_components_ = self.pca.n_components_ 743 self.components_ = self.pca.components_ 744 self.mean_ = self.pca.mean_ 745 746 return self
Fit the model to the input SNP data stored in the provided snpobj
.
Arguments:
- snpobj (SNPObject, optional): A SNPObject instance. If None, defaults to
self.snpobj
. - average_strands (bool, optional): True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
If None, defaults to
self.average_strands
. - samples_subset (int or list of int, optional): Subset of samples to include, as an integer for the first samples or a list of sample indices.
If None, defaults to
self.samples_subset
. - snps_subset (int or list of int, optional): Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices.
If None, defaults to
self.snps_subset
.
Returns:
PCA: The fitted instance of
self
.
748 def transform( 749 self, 750 snpobj: Optional['SNPObject'] = None, 751 average_strands: Optional[bool] = None, 752 samples_subset: Optional[Union[int, List]] = None, 753 snps_subset: Optional[Union[int, List]] = None 754 ): 755 """ 756 Apply dimensionality reduction to the input SNP data stored in the provided `snpobj` using the fitted model. 757 758 Args: 759 snpobj (SNPObject, optional): 760 A SNPObject instance. If None, defaults to `self.snpobj`. 761 average_strands (bool, optional): 762 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 763 If None, defaults to `self.average_strands`. 764 samples_subset (int or list of int, optional): 765 Subset of samples to include, as an integer for the first samples or a list of sample indices. 766 If None, defaults to `self.samples_subset`. 767 snps_subset (int or list of int, optional): 768 Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices. 769 If None, defaults to `self.snps_subset`. 770 771 Returns: 772 **tensor or array of shape (n_samples, n_components):** 773 The transformed SNP data projected onto the `n_components_` principal components, 774 stored in `self.X_new_`. 775 """ 776 # Retrieve or update the data to transform 777 if snpobj is not None or self.X_ is None: 778 self.X_ = self._get_data_from_snpobj(snpobj, average_strands, samples_subset, snps_subset) 779 780 # Apply transformation using the fitted PCA model 781 return self.pca.transform(self.X_)
Apply dimensionality reduction to the input SNP data stored in the provided snpobj
using the fitted model.
Arguments:
- snpobj (SNPObject, optional): A SNPObject instance. If None, defaults to
self.snpobj
. - average_strands (bool, optional): True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
If None, defaults to
self.average_strands
. - samples_subset (int or list of int, optional): Subset of samples to include, as an integer for the first samples or a list of sample indices.
If None, defaults to
self.samples_subset
. - snps_subset (int or list of int, optional): Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices.
If None, defaults to
self.snps_subset
.
Returns:
tensor or array of shape (n_samples, n_components): The transformed SNP data projected onto the
n_components_
principal components, stored inself.X_new_
.
783 def fit_transform(self, snpobj: Optional['SNPObject'] = None, average_strands: Optional[bool] = None, 784 samples_subset: Optional[Union[int, List]] = None, snps_subset: Optional[Union[int, List]] = None): 785 """ 786 Fit the model to the SNP data stored in the provided `snpobj` and apply the dimensionality reduction 787 on the same SNP data. 788 789 Args: 790 snpobj (SNPObject, optional): 791 A SNPObject instance. If None, defaults to `self.snpobj`. 792 average_strands (bool, optional): 793 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 794 If None, defaults to `self.average_strands`. 795 samples_subset (int or list of int, optional): 796 Subset of samples to include, as an integer for the first samples or a list of sample indices. 797 If None, defaults to `self.samples_subset`. 798 snps_subset (int or list of int, optional): 799 Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices. 800 If None, defaults to `self.snps_subset`. 801 802 Returns: 803 **tensor or array of shape (n_samples, n_components):** 804 The transformed SNP data projected onto the `n_components_` principal components, 805 stored in `self.X_new_`. 806 """ 807 self.X_ = self._get_data_from_snpobj(snpobj, average_strands, samples_subset, snps_subset) 808 self.X_new_ = self.pca.fit_transform(self.X_) 809 810 # Update attributes based on the fitted model 811 self.n_components_ = self.pca.n_components_ 812 self.components_ = self.pca.components_ 813 self.mean_ = self.pca.mean_ 814 815 return self.X_new_
Fit the model to the SNP data stored in the provided snpobj
and apply the dimensionality reduction
on the same SNP data.
Arguments:
- snpobj (SNPObject, optional): A SNPObject instance. If None, defaults to
self.snpobj
. - average_strands (bool, optional): True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
If None, defaults to
self.average_strands
. - samples_subset (int or list of int, optional): Subset of samples to include, as an integer for the first samples or a list of sample indices.
If None, defaults to
self.samples_subset
. - snps_subset (int or list of int, optional): Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices.
If None, defaults to
self.snps_subset
.
Returns:
tensor or array of shape (n_samples, n_components): The transformed SNP data projected onto the
n_components_
principal components, stored inself.X_new_
.