snputils.processing.pca
1import warnings 2import torch 3import numpy as np 4import copy 5from typing import Tuple, Optional, Union, List 6from sklearn.decomposition import PCA as skPCA 7 8from snputils.snp.genobj.snpobj import SNPObject 9 10 11def _svd_flip(u, v, u_based_decision=True): 12 """ 13 Sign correction to ensure deterministic output from SVD. 14 15 Adjusts the columns of u and the rows of v such that the loadings in the 16 columns in u that are largest in absolute value are always positive. 17 """ 18 if u_based_decision: 19 # columns of u, rows of v 20 max_abs_cols = torch.argmax(torch.abs(u), axis=0) 21 signs = torch.sign(u[max_abs_cols, range(u.shape[1])]) 22 u *= signs 23 v *= signs[:, None] 24 else: 25 # rows of v, columns of u 26 max_abs_rows = torch.argmax(torch.abs(v), axis=1) 27 signs = torch.sign(v[range(v.shape[0]), max_abs_rows]) 28 u *= signs 29 v *= signs[:, None] 30 31 return u, v 32 33 34class TorchPCA: 35 """ 36 A class for GPU-based Principal Component Analysis (PCA) with PyTorch tensors. 37 38 This implementation leverages GPU acceleration to achieve significant performance improvements, 39 being up to 25 times faster than `sklearn.decomposition.PCA` when running on a compatible GPU. 40 """ 41 def __init__(self, n_components: int = 2, fitting: str = 'reduced'): 42 """ 43 Args: 44 n_components (int, default=2): 45 The number of principal components. If None, defaults to the minimum of `n_samples` and `n_snps`. 46 fitting (str, default='reduced'): 47 The fitting approach for the SVD computation. Options: 48 - `'full'`: Full Singular Value Decomposition (SVD). 49 - `'reduced'`: Economy SVD. 50 - `'lowrank'`: Low-rank approximation, which provides a faster but approximate solution. 51 Default is 'reduced'. 52 """ 53 self.__n_components = n_components 54 self.__fitting = fitting 55 self.__n_components_ = None 56 self.__components_ = None 57 self.__mean_ = None 58 self.__X_new_ = None # Store transformed SNP data 59 60 def __getitem__(self, key): 61 """ 62 To access an attribute of the class using the square bracket notation, 63 similar to a dictionary. 64 """ 65 try: 66 return getattr(self, key) 67 except AttributeError: 68 raise KeyError(f'Invalid key: {key}') 69 70 def __setitem__(self, key, value): 71 """ 72 To set an attribute of the class using the square bracket notation, 73 similar to a dictionary. 74 """ 75 try: 76 setattr(self, key, value) 77 except AttributeError: 78 raise KeyError(f'Invalid key: {key}') 79 80 @property 81 def n_components(self) -> Optional[int]: 82 """ 83 Retrieve `n_components`. 84 85 Returns: 86 **int:** The number of principal components. 87 """ 88 return self.__n_components 89 90 @n_components.setter 91 def n_components(self, x: Optional[int]) -> None: 92 """ 93 Update `n_components`. 94 """ 95 if x is not None and (not isinstance(x, int) or x <= 0): 96 raise ValueError("n_components must be a positive integer or None.") 97 self.__n_components = x 98 99 @property 100 def fitting(self) -> str: 101 """ 102 Retrieve `fitting`. 103 104 Returns: 105 **str:** 106 The fitting approach for the SVD computation. 107 """ 108 return self.__fitting 109 110 @fitting.setter 111 def fitting(self, x: str) -> None: 112 """ 113 Update `fitting`. 114 """ 115 allowed = {"full", "reduced", "lowrank"} 116 if x not in allowed: 117 raise ValueError(f"fitting must be one of {sorted(allowed)}") 118 self.__fitting = x 119 120 @property 121 def n_components_(self) -> Optional[int]: 122 """ 123 Retrieve `n_components_`. 124 125 Returns: 126 **int:** 127 The effective number of components retained after fitting, 128 calculated as `min(self.n_components, min(n_samples, n_snps))`. 129 """ 130 return self.__n_components_ 131 132 @n_components_.setter 133 def n_components_(self, x: int) -> None: 134 """ 135 Update `n_components_`. 136 """ 137 self.__n_components_ = x 138 139 @property 140 def components_(self) -> Optional[torch.Tensor]: 141 """ 142 Retrieve `components_`. 143 144 Returns: 145 **tensor of shape (n_components_, n_snps):** 146 Matrix of principal components, where each row is a principal component vector. 147 """ 148 return self.__components_ 149 150 @components_.setter 151 def components_(self, x: torch.Tensor) -> None: 152 """ 153 Update `components_`. 154 """ 155 self.__components_ = x 156 157 @property 158 def mean_(self) -> Optional[torch.Tensor]: 159 """ 160 Retrieve `mean_`. 161 162 Returns: 163 **tensor of shape (n_snps,):** 164 Per-feature mean vector of the input data used for centering. 165 """ 166 return self.__mean_ 167 168 @mean_.setter 169 def mean_(self, x: torch.Tensor) -> None: 170 """ 171 Update `mean_`. 172 """ 173 self.__mean_ = x 174 175 @property 176 def X_new_(self) -> Optional[Union[torch.Tensor, np.ndarray]]: 177 """ 178 Retrieve `X_new_`. 179 180 Returns: 181 **tensor of shape (n_samples, n_components_):** 182 The transformed SNP data projected onto the `n_components_` principal components. 183 """ 184 return self.__X_new_ 185 186 @X_new_.setter 187 def X_new_(self, x: torch.Tensor) -> None: 188 """ 189 Update `X_new_`. 190 """ 191 self.__X_new_ = x 192 193 def copy(self) -> 'TorchPCA': 194 """ 195 Create and return a copy of `self`. 196 197 Returns: 198 **TorchPCA:** 199 A new instance of the current object. 200 """ 201 return copy.copy(self) 202 203 def _fit(self, X: torch.Tensor) -> Tuple: 204 """ 205 Internal method to fit the PCA model to the data `X`. 206 207 Args: 208 X (tensor of shape (n_samples, n_snps)): 209 Input SNP data used for fitting the model. 210 211 Returns: 212 Tuple: U, S, and Vt matrices from the SVD, where: 213 - `U` has shape (n_samples, n_components). 214 - `S` contains singular values (n_components). 215 - `Vt` has shape (n_components, n_snps) and represents the principal components. 216 """ 217 n_samples, n_snps = X.shape 218 self.n_components_ = min(self.n_components or min(X.shape), min(X.shape)) 219 220 if self.n_components_ > min(X.shape): 221 raise ValueError(f"n_components should be <= min(n_samples: {n_samples}, n_snps: {n_snps})") 222 223 # Compute the mean to center the data 224 self.mean_ = torch.mean(X, dim=0) 225 226 # Choose SVD method based on the fitting type 227 if self.fitting == "full": 228 U, S, Vt = torch.linalg.svd(X - self.mean_, full_matrices=True) 229 elif self.fitting == "reduced": 230 U, S, Vt = torch.linalg.svd(X - self.mean_, full_matrices=False) 231 elif self.fitting == "lowrank": 232 U, S, V = torch.svd_lowrank(X, q=self.n_components_, M=self.mean_) 233 Vt = V.mT # Transpose V to get Vt 234 else: 235 raise ValueError(f"Unrecognized fitting method: {self.fitting}") 236 237 # Select the first `n_components` columns and singular values 238 U = U[:, :self.n_components_] 239 S = S[:self.n_components_] 240 Vt = Vt[:self.n_components_] 241 242 # Ensure deterministic output for U and Vt signs 243 U, Vt = _svd_flip(U, Vt) 244 self.components_ = Vt 245 246 return U, S, Vt 247 248 def fit(self, X: torch.Tensor) -> 'TorchPCA': 249 """ 250 Fit the model to the input SNP data. 251 252 Args: 253 X (tensor of shape (n_samples, n_snps)): 254 The SNP data matrix to fit the model. 255 256 Returns: 257 **TorchPCA:** 258 The fitted instance of `self`. 259 """ 260 self._fit(X) 261 return self 262 263 def transform(self, X: torch.Tensor) -> torch.Tensor: 264 """ 265 Apply dimensionality reduction to the input SNP data using the fitted model. 266 267 Args: 268 X (tensor of shape (n_samples, n_snps)): 269 The SNP data matrix to be transformed. 270 271 Returns: 272 **tensor of shape (n_samples, n_components_):** 273 The transformed SNP data projected onto the `n_components_` principal components, 274 stored in `self.X_new_`. 275 """ 276 if self.components_ is None or self.mean_ is None: 277 raise ValueError("The PCA model must be fitted before calling `transform`.") 278 279 self.X_new_ = torch.matmul(X - self.mean_, self.components_.T) 280 return self.X_new_ 281 282 def fit_transform(self, X): 283 """ 284 Fit the model to the SNP data and apply dimensionality reduction on the same SNP data. 285 286 Args: 287 X (tensor of shape n_samples, n_snps): 288 The SNP data matrix used for both fitting and transformation. 289 290 Returns: 291 **tensor of shape (n_samples, n_components_):** 292 The transformed SNP data projected onto the `n_components_` principal components, 293 stored in `self.X_new_`. 294 """ 295 U, S, _ = self._fit(X) 296 self.X_new_ = U * S.unsqueeze(0) 297 return self.X_new_ 298 299 300class PCA: 301 """ 302 A class for Principal Component Analysis (PCA) on SNP data stored in a `snputils.snp.genobj.SNPObject`. 303 This class employs either 304 [sklearn.decomposition.PCA](https://scikit-learn.org/stable/modules/generated/sklearn.decomposition.PCA.html) 305 or the custom `TorchPCA` implementation for efficient GPU-accelerated analysis. 306 307 The PCA class supports both separate and averaged strand processing for SNP data. If the `snpobj` parameter is 308 provided during instantiation, the `fit_transform` method will be automatically called, 309 applying PCA to transform the data according to the selected configuration. 310 """ 311 def __init__( 312 self, 313 snpobj: Optional['SNPObject'] = None, 314 backend: str = 'pytorch', 315 n_components: int = 2, 316 fitting: str = 'full', 317 device: str = 'cpu', 318 average_strands: bool = True, 319 samples_subset: Optional[Union[int, List]] = None, 320 snps_subset: Optional[Union[int, List]] = None 321 ): 322 """ 323 Args: 324 snpobj (SNPObject, optional): 325 A SNPObject instance. 326 backend (str, default='pytorch'): 327 The backend to use (`'sklearn'` or `'pytorch'`). Default is 'pytorch'. 328 n_components (int, default=2): 329 The number of principal components. Default is 2. 330 fitting (str, default='full'): 331 The fitting approach to use for the SVD computation (only for `backend='pytorch'`). 332 - `'full'`: Full Singular Value Decomposition (SVD). 333 - `'reduced'`: Economy SVD. 334 - `'lowrank'`: Low-rank approximation, which provides a faster but approximate solution. 335 Default is 'full'. 336 device (str, default='cpu'): 337 Device to use (`'cpu'`, `'gpu'`, `'cuda'`, or `'cuda:<index>'`). Default is 'cpu'. 338 average_strands (bool, default=True): 339 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 340 samples_subset (int or list of int, optional): 341 Subset of samples to include, as an integer for the first samples or a list of sample indices. 342 snps_subset (int or list of int, optional): 343 Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices. 344 """ 345 self.__snpobj = snpobj 346 self.__backend = backend 347 self.__n_components = n_components 348 self.__fitting = fitting 349 self.__device = self._process_device_argument(device) 350 self.__average_strands = average_strands 351 self.__samples_subset = samples_subset 352 self.__snps_subset = snps_subset 353 self.__X_ = None 354 self.__X_new_ = None # Store transformed SNP data 355 self.__n_components_ = None 356 self.__components_ = None 357 self.__mean_ = None 358 359 # Initialize PCA backend 360 if self.backend == "pytorch": 361 self.pca = TorchPCA(n_components=self.n_components, fitting=self.fitting) 362 elif self.backend == "sklearn": 363 self.pca = skPCA(n_components=self.n_components) 364 else: 365 raise ValueError("Unknown backend for PCA: ", backend) 366 367 # Fit and transform if a `snpobj` is provided 368 if self.snpobj is not None: 369 self.fit_transform(snpobj) 370 371 @property 372 def snpobj(self) -> Optional['SNPObject']: 373 """ 374 Retrieve `snpobj`. 375 376 Returns: 377 **SNPObject:** A SNPObject instance. 378 """ 379 return self.__snpobj 380 381 @snpobj.setter 382 def snpobj(self, x) -> None: 383 """ 384 Update `snpobj`. 385 """ 386 self.__snpobj = x 387 388 @property 389 def backend(self) -> str: 390 """ 391 Retrieve `backend`. 392 393 Returns: 394 **str:** The backend to use (`'sklearn'` or `'pytorch'`). 395 """ 396 return self.__backend 397 398 @backend.setter 399 def backend(self, x: str) -> None: 400 """ 401 Update `backend`. 402 """ 403 self.__backend = x 404 405 @property 406 def n_components(self) -> int: 407 """ 408 Retrieve `n_components`. 409 410 Returns: 411 **int:** The number of principal components. 412 """ 413 return self.__n_components 414 415 @n_components.setter 416 def n_components(self, x: int) -> None: 417 """ 418 Update `n_components`. 419 """ 420 self.__n_components = x 421 422 @property 423 def fitting(self) -> str: 424 """ 425 Retrieve `fitting`. 426 427 Returns: 428 **str:** 429 The fitting approach to use for the SVD computation (only for `backend='pytorch'`). 430 - `'full'`: Full Singular Value Decomposition (SVD). 431 - `'lowrank'`: Low-rank approximation, which provides a faster but approximate solution. 432 """ 433 return self.__fitting 434 435 @fitting.setter 436 def fitting(self, x: str) -> None: 437 """ 438 Update `fitting`. 439 """ 440 self.__fitting = x 441 442 @property 443 def device(self) -> torch.device: 444 """ 445 Retrieve `device`. 446 447 Returns: 448 **torch.device:** Device to use (`'cpu'`, `'gpu'`, `'cuda'`, or `'cuda:<index>'`). 449 """ 450 return self.__device 451 452 @device.setter 453 def device(self, x: str) -> None: 454 """ 455 Update `device`. 456 """ 457 self.__device = self._process_device_argument(x) 458 459 @property 460 def average_strands(self) -> bool: 461 """ 462 Retrieve `average_strands`. 463 464 Returns: 465 **bool:** 466 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 467 """ 468 return self.__average_strands 469 470 @average_strands.setter 471 def average_strands(self, x: bool) -> None: 472 """ 473 Update `average_strands`. 474 """ 475 self.__average_strands = x 476 477 @property 478 def samples_subset(self) -> Optional[Union[int, List[int]]]: 479 """ 480 Retrieve `samples_subset`. 481 482 Returns: 483 **int or list of int:** 484 Subset of samples to include, as an integer for the first samples or a list of sample indices. 485 """ 486 return self.__samples_subset 487 488 @samples_subset.setter 489 def samples_subset(self, x: Optional[Union[int, List[int]]]) -> None: 490 """ 491 Update `samples_subset`. 492 """ 493 self.__samples_subset = x 494 495 @property 496 def snps_subset(self) -> Optional[Union[int, List[int]]]: 497 """ 498 Retrieve `snps_subset`. 499 500 Returns: 501 **int or list of int:** Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices. 502 """ 503 return self.__snps_subset 504 505 @snps_subset.setter 506 def snps_subset(self, x: Optional[Union[int, List[int]]]) -> None: 507 """ 508 Update `snps_subset`. 509 """ 510 self.__snps_subset = x 511 512 @property 513 def n_components_(self) -> Optional[int]: 514 """ 515 Retrieve `n_components_`. 516 517 Returns: 518 **int:** 519 The effective number of components retained after fitting, 520 calculated as `min(self.n_components, min(n_samples, n_snps))`. 521 """ 522 return self.__n_components_ 523 524 @n_components_.setter 525 def n_components_(self, x: int) -> None: 526 """ 527 Update `n_components_`. 528 """ 529 self.__n_components_ = x 530 531 @property 532 def components_(self) -> Optional[Union[torch.Tensor, np.ndarray]]: 533 """ 534 Retrieve `components_`. 535 536 Returns: 537 **tensor or array of shape (n_components_, n_snps):** 538 Matrix of principal components, where each row is a principal component vector. 539 """ 540 return self.__components_ 541 542 @components_.setter 543 def components_(self, x: Union[torch.Tensor, np.ndarray]) -> None: 544 """ 545 Update `components_`. 546 """ 547 self.__components_ = x 548 549 @property 550 def mean_(self) -> Optional[Union[torch.Tensor, np.ndarray]]: 551 """ 552 Retrieve `mean_`. 553 554 Returns: 555 **tensor or array of shape (n_snps,):** 556 Per-feature mean vector of the input data used for centering. 557 """ 558 return self.__mean_ 559 560 @mean_.setter 561 def mean_(self, x: Union[torch.Tensor, np.ndarray]) -> None: 562 """ 563 Update `mean_`. 564 """ 565 self.__mean_ = x 566 567 @property 568 def X_(self) -> Optional[Union[torch.Tensor, np.ndarray]]: 569 """ 570 Retrieve `X_`. 571 572 Returns: 573 **tensor or array of shape (n_samples, n_snps):** 574 The SNP data matrix used to fit the model. 575 """ 576 return self.__X_ 577 578 @X_.setter 579 def X_(self, x: Union[torch.Tensor, np.ndarray]) -> None: 580 """ 581 Update `X_`. 582 """ 583 self.__X_ = x 584 585 @property 586 def X_new_(self) -> Optional[Union[torch.Tensor, np.ndarray]]: 587 """ 588 Retrieve `X_new_`. 589 590 Returns: 591 **tensor or array of shape (n_samples, n_components_):** 592 The transformed SNP data projected onto the `n_components_` principal components. 593 """ 594 return self.__X_new_ 595 596 @X_new_.setter 597 def X_new_(self, x: Union[torch.Tensor, np.ndarray]) -> None: 598 """ 599 Update `X_new_`. 600 """ 601 self.__X_new_ = x 602 603 def copy(self) -> 'PCA': 604 """ 605 Create and return a copy of `self`. 606 607 Returns: 608 **PCA:** 609 A new instance of the current object. 610 """ 611 return copy.copy(self) 612 613 def _process_device_argument(self, device: str): 614 """ 615 Process the device argument to map user-friendly device names to PyTorch device specifications. 616 617 Args: 618 device (str): Device specified by the user. 619 620 Returns: 621 torch.device: PyTorch device object. 622 """ 623 if isinstance(device, str): 624 device_lower = device.lower() 625 if device_lower in ['cpu']: 626 return torch.device('cpu') 627 elif device_lower in ['gpu', 'cuda']: 628 if torch.cuda.is_available(): 629 return torch.device('cuda') 630 else: 631 warnings.warn("CUDA is not available; using CPU instead.") 632 return torch.device('cpu') 633 elif device_lower.startswith('cuda:'): 634 if torch.cuda.is_available(): 635 return torch.device(device_lower) 636 else: 637 warnings.warn(f"CUDA is not available; requested device '{device}' is not available. Using CPU instead.") 638 return torch.device('cpu') 639 else: 640 raise ValueError(f"Unknown device type: '{device}'. Please use 'CPU', 'GPU', 'cuda', or 'cuda:<index>'.") 641 elif isinstance(device, torch.device): 642 return device 643 else: 644 raise TypeError(f"Device must be a string or torch.device, got {type(device)}.") 645 646 def _get_data_from_snpobj( 647 self, 648 snpobj: Optional['SNPObject'] = None, 649 average_strands: Optional[bool] = None, 650 samples_subset: Optional[Union[int, List]] = None, 651 snps_subset: Optional[Union[int, List]] = None 652 ) -> Union[np.ndarray, torch.Tensor]: 653 """ 654 Retrieve and prepare SNP data for PCA analysis, with options for selecting subsets and handling strands. 655 656 This method processes SNP data stored in an `SNPObject`, which may include averaging of paternal 657 and maternal strands or selecting subsets of samples and SNPs. The prepared data is formatted 658 for use in PCA, optionally converted to a PyTorch tensor if the backend is set to 'pytorch'. 659 660 Args: 661 snpobj (SNPObject, optional): 662 A SNPObject object instance. If None, defaults to `self.snpobj`. 663 average_strands (bool, optional): 664 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 665 If None, defaults to `self.average_strands`. 666 samples_subset (int or list of int, optional): 667 Subset of samples to include, as an integer for the first samples or a list of sample indices. 668 If None, defaults to `self.samples_subset`. 669 snps_subset (int or list of int, optional): 670 Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices. 671 If None, defaults to `self.snps_subset`. 672 673 Returns: 674 numpy.ndarray or torch.Tensor: 675 The processed SNP data. If `backend` is set to 'pytorch', the data is returned as a 676 PyTorch tensor on the specified `device`. 677 """ 678 if snpobj is None: 679 snpobj = self.snpobj 680 if average_strands is None: 681 average_strands = self.average_strands 682 if samples_subset is None: 683 samples_subset = self.samples_subset 684 if snps_subset is None: 685 snps_subset = self.snps_subset 686 687 if snpobj.calldata_gt.ndim == 2: 688 X = np.transpose(snpobj.calldata_gt.astype(float), (1,0)) 689 elif snpobj.calldata_gt.ndim == 3: 690 X = np.transpose(snpobj.calldata_gt.astype(float), (1,0,2)) 691 692 if average_strands: 693 X = np.mean(X, axis=2) 694 else: 695 X = np.reshape(X, (-1, X.shape[1])) 696 else: 697 raise ValueError(f"Invalid shape for `calldata_gt`: expected a 2D or 3D array, but got {snpobj.calldata_gt.ndim}D array.") 698 699 # Handle sample and SNP subsets 700 if isinstance(samples_subset, int): 701 X = X[:samples_subset] 702 elif isinstance(samples_subset, list): 703 X = X[samples_subset] 704 705 if isinstance(snps_subset, int): 706 X = X[:, :snps_subset] 707 elif isinstance(snps_subset, list): 708 X = X[:, snps_subset] 709 710 if self.backend == "pytorch": 711 print(f"Converting data to PyTorch tensor on device {self.device}") 712 X = torch.from_numpy(X).to(self.device) 713 714 return X 715 716 def fit( 717 self, 718 snpobj: Optional['SNPObject'] = None, 719 average_strands: Optional[bool] = None, 720 samples_subset: Optional[Union[int, List]] = None, 721 snps_subset: Optional[Union[int, List]] = None 722 ) -> 'PCA': 723 """ 724 Fit the model to the input SNP data stored in the provided `snpobj`. 725 726 Args: 727 snpobj (SNPObject, optional): 728 A SNPObject instance. If None, defaults to `self.snpobj`. 729 average_strands (bool, optional): 730 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 731 If None, defaults to `self.average_strands`. 732 samples_subset (int or list of int, optional): 733 Subset of samples to include, as an integer for the first samples or a list of sample indices. 734 If None, defaults to `self.samples_subset`. 735 snps_subset (int or list of int, optional): 736 Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices. 737 If None, defaults to `self.snps_subset`. 738 739 Returns: 740 **PCA:** 741 The fitted instance of `self`. 742 """ 743 self.X_ = self._get_data_from_snpobj(snpobj, average_strands, samples_subset, snps_subset) 744 self.pca.fit(self.X_) 745 746 # Update attributes based on the fitted model 747 self.n_components_ = self.pca.n_components_ 748 self.components_ = self.pca.components_ 749 self.mean_ = self.pca.mean_ 750 751 return self 752 753 def transform( 754 self, 755 snpobj: Optional['SNPObject'] = None, 756 average_strands: Optional[bool] = None, 757 samples_subset: Optional[Union[int, List]] = None, 758 snps_subset: Optional[Union[int, List]] = None 759 ): 760 """ 761 Apply dimensionality reduction to the input SNP data stored in the provided `snpobj` using the fitted model. 762 763 Args: 764 snpobj (SNPObject, optional): 765 A SNPObject instance. If None, defaults to `self.snpobj`. 766 average_strands (bool, optional): 767 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 768 If None, defaults to `self.average_strands`. 769 samples_subset (int or list of int, optional): 770 Subset of samples to include, as an integer for the first samples or a list of sample indices. 771 If None, defaults to `self.samples_subset`. 772 snps_subset (int or list of int, optional): 773 Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices. 774 If None, defaults to `self.snps_subset`. 775 776 Returns: 777 **tensor or array of shape (n_samples, n_components):** 778 The transformed SNP data projected onto the `n_components_` principal components, 779 stored in `self.X_new_`. 780 """ 781 # Retrieve or update the data to transform 782 if snpobj is not None or self.X_ is None: 783 self.X_ = self._get_data_from_snpobj(snpobj, average_strands, samples_subset, snps_subset) 784 785 # Apply transformation using the fitted PCA model 786 return self.pca.transform(self.X_) 787 788 def fit_transform(self, snpobj: Optional['SNPObject'] = None, average_strands: Optional[bool] = None, 789 samples_subset: Optional[Union[int, List]] = None, snps_subset: Optional[Union[int, List]] = None): 790 """ 791 Fit the model to the SNP data stored in the provided `snpobj` and apply the dimensionality reduction 792 on the same SNP data. 793 794 Args: 795 snpobj (SNPObject, optional): 796 A SNPObject instance. If None, defaults to `self.snpobj`. 797 average_strands (bool, optional): 798 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 799 If None, defaults to `self.average_strands`. 800 samples_subset (int or list of int, optional): 801 Subset of samples to include, as an integer for the first samples or a list of sample indices. 802 If None, defaults to `self.samples_subset`. 803 snps_subset (int or list of int, optional): 804 Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices. 805 If None, defaults to `self.snps_subset`. 806 807 Returns: 808 **tensor or array of shape (n_samples, n_components):** 809 The transformed SNP data projected onto the `n_components_` principal components, 810 stored in `self.X_new_`. 811 """ 812 self.X_ = self._get_data_from_snpobj(snpobj, average_strands, samples_subset, snps_subset) 813 self.X_new_ = self.pca.fit_transform(self.X_) 814 815 # Update attributes based on the fitted model 816 self.n_components_ = self.pca.n_components_ 817 self.components_ = self.pca.components_ 818 self.mean_ = self.pca.mean_ 819 820 return self.X_new_
35class TorchPCA: 36 """ 37 A class for GPU-based Principal Component Analysis (PCA) with PyTorch tensors. 38 39 This implementation leverages GPU acceleration to achieve significant performance improvements, 40 being up to 25 times faster than `sklearn.decomposition.PCA` when running on a compatible GPU. 41 """ 42 def __init__(self, n_components: int = 2, fitting: str = 'reduced'): 43 """ 44 Args: 45 n_components (int, default=2): 46 The number of principal components. If None, defaults to the minimum of `n_samples` and `n_snps`. 47 fitting (str, default='reduced'): 48 The fitting approach for the SVD computation. Options: 49 - `'full'`: Full Singular Value Decomposition (SVD). 50 - `'reduced'`: Economy SVD. 51 - `'lowrank'`: Low-rank approximation, which provides a faster but approximate solution. 52 Default is 'reduced'. 53 """ 54 self.__n_components = n_components 55 self.__fitting = fitting 56 self.__n_components_ = None 57 self.__components_ = None 58 self.__mean_ = None 59 self.__X_new_ = None # Store transformed SNP data 60 61 def __getitem__(self, key): 62 """ 63 To access an attribute of the class using the square bracket notation, 64 similar to a dictionary. 65 """ 66 try: 67 return getattr(self, key) 68 except AttributeError: 69 raise KeyError(f'Invalid key: {key}') 70 71 def __setitem__(self, key, value): 72 """ 73 To set an attribute of the class using the square bracket notation, 74 similar to a dictionary. 75 """ 76 try: 77 setattr(self, key, value) 78 except AttributeError: 79 raise KeyError(f'Invalid key: {key}') 80 81 @property 82 def n_components(self) -> Optional[int]: 83 """ 84 Retrieve `n_components`. 85 86 Returns: 87 **int:** The number of principal components. 88 """ 89 return self.__n_components 90 91 @n_components.setter 92 def n_components(self, x: Optional[int]) -> None: 93 """ 94 Update `n_components`. 95 """ 96 if x is not None and (not isinstance(x, int) or x <= 0): 97 raise ValueError("n_components must be a positive integer or None.") 98 self.__n_components = x 99 100 @property 101 def fitting(self) -> str: 102 """ 103 Retrieve `fitting`. 104 105 Returns: 106 **str:** 107 The fitting approach for the SVD computation. 108 """ 109 return self.__fitting 110 111 @fitting.setter 112 def fitting(self, x: str) -> None: 113 """ 114 Update `fitting`. 115 """ 116 allowed = {"full", "reduced", "lowrank"} 117 if x not in allowed: 118 raise ValueError(f"fitting must be one of {sorted(allowed)}") 119 self.__fitting = x 120 121 @property 122 def n_components_(self) -> Optional[int]: 123 """ 124 Retrieve `n_components_`. 125 126 Returns: 127 **int:** 128 The effective number of components retained after fitting, 129 calculated as `min(self.n_components, min(n_samples, n_snps))`. 130 """ 131 return self.__n_components_ 132 133 @n_components_.setter 134 def n_components_(self, x: int) -> None: 135 """ 136 Update `n_components_`. 137 """ 138 self.__n_components_ = x 139 140 @property 141 def components_(self) -> Optional[torch.Tensor]: 142 """ 143 Retrieve `components_`. 144 145 Returns: 146 **tensor of shape (n_components_, n_snps):** 147 Matrix of principal components, where each row is a principal component vector. 148 """ 149 return self.__components_ 150 151 @components_.setter 152 def components_(self, x: torch.Tensor) -> None: 153 """ 154 Update `components_`. 155 """ 156 self.__components_ = x 157 158 @property 159 def mean_(self) -> Optional[torch.Tensor]: 160 """ 161 Retrieve `mean_`. 162 163 Returns: 164 **tensor of shape (n_snps,):** 165 Per-feature mean vector of the input data used for centering. 166 """ 167 return self.__mean_ 168 169 @mean_.setter 170 def mean_(self, x: torch.Tensor) -> None: 171 """ 172 Update `mean_`. 173 """ 174 self.__mean_ = x 175 176 @property 177 def X_new_(self) -> Optional[Union[torch.Tensor, np.ndarray]]: 178 """ 179 Retrieve `X_new_`. 180 181 Returns: 182 **tensor of shape (n_samples, n_components_):** 183 The transformed SNP data projected onto the `n_components_` principal components. 184 """ 185 return self.__X_new_ 186 187 @X_new_.setter 188 def X_new_(self, x: torch.Tensor) -> None: 189 """ 190 Update `X_new_`. 191 """ 192 self.__X_new_ = x 193 194 def copy(self) -> 'TorchPCA': 195 """ 196 Create and return a copy of `self`. 197 198 Returns: 199 **TorchPCA:** 200 A new instance of the current object. 201 """ 202 return copy.copy(self) 203 204 def _fit(self, X: torch.Tensor) -> Tuple: 205 """ 206 Internal method to fit the PCA model to the data `X`. 207 208 Args: 209 X (tensor of shape (n_samples, n_snps)): 210 Input SNP data used for fitting the model. 211 212 Returns: 213 Tuple: U, S, and Vt matrices from the SVD, where: 214 - `U` has shape (n_samples, n_components). 215 - `S` contains singular values (n_components). 216 - `Vt` has shape (n_components, n_snps) and represents the principal components. 217 """ 218 n_samples, n_snps = X.shape 219 self.n_components_ = min(self.n_components or min(X.shape), min(X.shape)) 220 221 if self.n_components_ > min(X.shape): 222 raise ValueError(f"n_components should be <= min(n_samples: {n_samples}, n_snps: {n_snps})") 223 224 # Compute the mean to center the data 225 self.mean_ = torch.mean(X, dim=0) 226 227 # Choose SVD method based on the fitting type 228 if self.fitting == "full": 229 U, S, Vt = torch.linalg.svd(X - self.mean_, full_matrices=True) 230 elif self.fitting == "reduced": 231 U, S, Vt = torch.linalg.svd(X - self.mean_, full_matrices=False) 232 elif self.fitting == "lowrank": 233 U, S, V = torch.svd_lowrank(X, q=self.n_components_, M=self.mean_) 234 Vt = V.mT # Transpose V to get Vt 235 else: 236 raise ValueError(f"Unrecognized fitting method: {self.fitting}") 237 238 # Select the first `n_components` columns and singular values 239 U = U[:, :self.n_components_] 240 S = S[:self.n_components_] 241 Vt = Vt[:self.n_components_] 242 243 # Ensure deterministic output for U and Vt signs 244 U, Vt = _svd_flip(U, Vt) 245 self.components_ = Vt 246 247 return U, S, Vt 248 249 def fit(self, X: torch.Tensor) -> 'TorchPCA': 250 """ 251 Fit the model to the input SNP data. 252 253 Args: 254 X (tensor of shape (n_samples, n_snps)): 255 The SNP data matrix to fit the model. 256 257 Returns: 258 **TorchPCA:** 259 The fitted instance of `self`. 260 """ 261 self._fit(X) 262 return self 263 264 def transform(self, X: torch.Tensor) -> torch.Tensor: 265 """ 266 Apply dimensionality reduction to the input SNP data using the fitted model. 267 268 Args: 269 X (tensor of shape (n_samples, n_snps)): 270 The SNP data matrix to be transformed. 271 272 Returns: 273 **tensor of shape (n_samples, n_components_):** 274 The transformed SNP data projected onto the `n_components_` principal components, 275 stored in `self.X_new_`. 276 """ 277 if self.components_ is None or self.mean_ is None: 278 raise ValueError("The PCA model must be fitted before calling `transform`.") 279 280 self.X_new_ = torch.matmul(X - self.mean_, self.components_.T) 281 return self.X_new_ 282 283 def fit_transform(self, X): 284 """ 285 Fit the model to the SNP data and apply dimensionality reduction on the same SNP data. 286 287 Args: 288 X (tensor of shape n_samples, n_snps): 289 The SNP data matrix used for both fitting and transformation. 290 291 Returns: 292 **tensor of shape (n_samples, n_components_):** 293 The transformed SNP data projected onto the `n_components_` principal components, 294 stored in `self.X_new_`. 295 """ 296 U, S, _ = self._fit(X) 297 self.X_new_ = U * S.unsqueeze(0) 298 return self.X_new_
A class for GPU-based Principal Component Analysis (PCA) with PyTorch tensors.
This implementation leverages GPU acceleration to achieve significant performance improvements,
being up to 25 times faster than sklearn.decomposition.PCA
when running on a compatible GPU.
42 def __init__(self, n_components: int = 2, fitting: str = 'reduced'): 43 """ 44 Args: 45 n_components (int, default=2): 46 The number of principal components. If None, defaults to the minimum of `n_samples` and `n_snps`. 47 fitting (str, default='reduced'): 48 The fitting approach for the SVD computation. Options: 49 - `'full'`: Full Singular Value Decomposition (SVD). 50 - `'reduced'`: Economy SVD. 51 - `'lowrank'`: Low-rank approximation, which provides a faster but approximate solution. 52 Default is 'reduced'. 53 """ 54 self.__n_components = n_components 55 self.__fitting = fitting 56 self.__n_components_ = None 57 self.__components_ = None 58 self.__mean_ = None 59 self.__X_new_ = None # Store transformed SNP data
Arguments:
- n_components (int, default=2): The number of principal components. If None, defaults to the minimum of
n_samples
andn_snps
. - fitting (str, default='reduced'): The fitting approach for the SVD computation. Options:
'full'
: Full Singular Value Decomposition (SVD).'reduced'
: Economy SVD.'lowrank'
: Low-rank approximation, which provides a faster but approximate solution. Default is 'reduced'.
121 @property 122 def n_components_(self) -> Optional[int]: 123 """ 124 Retrieve `n_components_`. 125 126 Returns: 127 **int:** 128 The effective number of components retained after fitting, 129 calculated as `min(self.n_components, min(n_samples, n_snps))`. 130 """ 131 return self.__n_components_
Retrieve n_components_
.
Returns:
int: The effective number of components retained after fitting, calculated as
min(self.n_components, min(n_samples, n_snps))
.
140 @property 141 def components_(self) -> Optional[torch.Tensor]: 142 """ 143 Retrieve `components_`. 144 145 Returns: 146 **tensor of shape (n_components_, n_snps):** 147 Matrix of principal components, where each row is a principal component vector. 148 """ 149 return self.__components_
Retrieve components_
.
Returns:
tensor of shape (n_components_, n_snps): Matrix of principal components, where each row is a principal component vector.
158 @property 159 def mean_(self) -> Optional[torch.Tensor]: 160 """ 161 Retrieve `mean_`. 162 163 Returns: 164 **tensor of shape (n_snps,):** 165 Per-feature mean vector of the input data used for centering. 166 """ 167 return self.__mean_
Retrieve mean_
.
Returns:
tensor of shape (n_snps,): Per-feature mean vector of the input data used for centering.
176 @property 177 def X_new_(self) -> Optional[Union[torch.Tensor, np.ndarray]]: 178 """ 179 Retrieve `X_new_`. 180 181 Returns: 182 **tensor of shape (n_samples, n_components_):** 183 The transformed SNP data projected onto the `n_components_` principal components. 184 """ 185 return self.__X_new_
Retrieve X_new_
.
Returns:
tensor of shape (n_samples, n_components_): The transformed SNP data projected onto the
n_components_
principal components.
194 def copy(self) -> 'TorchPCA': 195 """ 196 Create and return a copy of `self`. 197 198 Returns: 199 **TorchPCA:** 200 A new instance of the current object. 201 """ 202 return copy.copy(self)
Create and return a copy of self
.
Returns:
TorchPCA: A new instance of the current object.
249 def fit(self, X: torch.Tensor) -> 'TorchPCA': 250 """ 251 Fit the model to the input SNP data. 252 253 Args: 254 X (tensor of shape (n_samples, n_snps)): 255 The SNP data matrix to fit the model. 256 257 Returns: 258 **TorchPCA:** 259 The fitted instance of `self`. 260 """ 261 self._fit(X) 262 return self
Fit the model to the input SNP data.
Arguments:
- X (tensor of shape (n_samples, n_snps)): The SNP data matrix to fit the model.
Returns:
TorchPCA: The fitted instance of
self
.
264 def transform(self, X: torch.Tensor) -> torch.Tensor: 265 """ 266 Apply dimensionality reduction to the input SNP data using the fitted model. 267 268 Args: 269 X (tensor of shape (n_samples, n_snps)): 270 The SNP data matrix to be transformed. 271 272 Returns: 273 **tensor of shape (n_samples, n_components_):** 274 The transformed SNP data projected onto the `n_components_` principal components, 275 stored in `self.X_new_`. 276 """ 277 if self.components_ is None or self.mean_ is None: 278 raise ValueError("The PCA model must be fitted before calling `transform`.") 279 280 self.X_new_ = torch.matmul(X - self.mean_, self.components_.T) 281 return self.X_new_
Apply dimensionality reduction to the input SNP data using the fitted model.
Arguments:
- X (tensor of shape (n_samples, n_snps)): The SNP data matrix to be transformed.
Returns:
tensor of shape (n_samples, n_components_): The transformed SNP data projected onto the
n_components_
principal components, stored inself.X_new_
.
283 def fit_transform(self, X): 284 """ 285 Fit the model to the SNP data and apply dimensionality reduction on the same SNP data. 286 287 Args: 288 X (tensor of shape n_samples, n_snps): 289 The SNP data matrix used for both fitting and transformation. 290 291 Returns: 292 **tensor of shape (n_samples, n_components_):** 293 The transformed SNP data projected onto the `n_components_` principal components, 294 stored in `self.X_new_`. 295 """ 296 U, S, _ = self._fit(X) 297 self.X_new_ = U * S.unsqueeze(0) 298 return self.X_new_
Fit the model to the SNP data and apply dimensionality reduction on the same SNP data.
Arguments:
- X (tensor of shape n_samples, n_snps): The SNP data matrix used for both fitting and transformation.
Returns:
tensor of shape (n_samples, n_components_): The transformed SNP data projected onto the
n_components_
principal components, stored inself.X_new_
.
301class PCA: 302 """ 303 A class for Principal Component Analysis (PCA) on SNP data stored in a `snputils.snp.genobj.SNPObject`. 304 This class employs either 305 [sklearn.decomposition.PCA](https://scikit-learn.org/stable/modules/generated/sklearn.decomposition.PCA.html) 306 or the custom `TorchPCA` implementation for efficient GPU-accelerated analysis. 307 308 The PCA class supports both separate and averaged strand processing for SNP data. If the `snpobj` parameter is 309 provided during instantiation, the `fit_transform` method will be automatically called, 310 applying PCA to transform the data according to the selected configuration. 311 """ 312 def __init__( 313 self, 314 snpobj: Optional['SNPObject'] = None, 315 backend: str = 'pytorch', 316 n_components: int = 2, 317 fitting: str = 'full', 318 device: str = 'cpu', 319 average_strands: bool = True, 320 samples_subset: Optional[Union[int, List]] = None, 321 snps_subset: Optional[Union[int, List]] = None 322 ): 323 """ 324 Args: 325 snpobj (SNPObject, optional): 326 A SNPObject instance. 327 backend (str, default='pytorch'): 328 The backend to use (`'sklearn'` or `'pytorch'`). Default is 'pytorch'. 329 n_components (int, default=2): 330 The number of principal components. Default is 2. 331 fitting (str, default='full'): 332 The fitting approach to use for the SVD computation (only for `backend='pytorch'`). 333 - `'full'`: Full Singular Value Decomposition (SVD). 334 - `'reduced'`: Economy SVD. 335 - `'lowrank'`: Low-rank approximation, which provides a faster but approximate solution. 336 Default is 'full'. 337 device (str, default='cpu'): 338 Device to use (`'cpu'`, `'gpu'`, `'cuda'`, or `'cuda:<index>'`). Default is 'cpu'. 339 average_strands (bool, default=True): 340 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 341 samples_subset (int or list of int, optional): 342 Subset of samples to include, as an integer for the first samples or a list of sample indices. 343 snps_subset (int or list of int, optional): 344 Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices. 345 """ 346 self.__snpobj = snpobj 347 self.__backend = backend 348 self.__n_components = n_components 349 self.__fitting = fitting 350 self.__device = self._process_device_argument(device) 351 self.__average_strands = average_strands 352 self.__samples_subset = samples_subset 353 self.__snps_subset = snps_subset 354 self.__X_ = None 355 self.__X_new_ = None # Store transformed SNP data 356 self.__n_components_ = None 357 self.__components_ = None 358 self.__mean_ = None 359 360 # Initialize PCA backend 361 if self.backend == "pytorch": 362 self.pca = TorchPCA(n_components=self.n_components, fitting=self.fitting) 363 elif self.backend == "sklearn": 364 self.pca = skPCA(n_components=self.n_components) 365 else: 366 raise ValueError("Unknown backend for PCA: ", backend) 367 368 # Fit and transform if a `snpobj` is provided 369 if self.snpobj is not None: 370 self.fit_transform(snpobj) 371 372 @property 373 def snpobj(self) -> Optional['SNPObject']: 374 """ 375 Retrieve `snpobj`. 376 377 Returns: 378 **SNPObject:** A SNPObject instance. 379 """ 380 return self.__snpobj 381 382 @snpobj.setter 383 def snpobj(self, x) -> None: 384 """ 385 Update `snpobj`. 386 """ 387 self.__snpobj = x 388 389 @property 390 def backend(self) -> str: 391 """ 392 Retrieve `backend`. 393 394 Returns: 395 **str:** The backend to use (`'sklearn'` or `'pytorch'`). 396 """ 397 return self.__backend 398 399 @backend.setter 400 def backend(self, x: str) -> None: 401 """ 402 Update `backend`. 403 """ 404 self.__backend = x 405 406 @property 407 def n_components(self) -> int: 408 """ 409 Retrieve `n_components`. 410 411 Returns: 412 **int:** The number of principal components. 413 """ 414 return self.__n_components 415 416 @n_components.setter 417 def n_components(self, x: int) -> None: 418 """ 419 Update `n_components`. 420 """ 421 self.__n_components = x 422 423 @property 424 def fitting(self) -> str: 425 """ 426 Retrieve `fitting`. 427 428 Returns: 429 **str:** 430 The fitting approach to use for the SVD computation (only for `backend='pytorch'`). 431 - `'full'`: Full Singular Value Decomposition (SVD). 432 - `'lowrank'`: Low-rank approximation, which provides a faster but approximate solution. 433 """ 434 return self.__fitting 435 436 @fitting.setter 437 def fitting(self, x: str) -> None: 438 """ 439 Update `fitting`. 440 """ 441 self.__fitting = x 442 443 @property 444 def device(self) -> torch.device: 445 """ 446 Retrieve `device`. 447 448 Returns: 449 **torch.device:** Device to use (`'cpu'`, `'gpu'`, `'cuda'`, or `'cuda:<index>'`). 450 """ 451 return self.__device 452 453 @device.setter 454 def device(self, x: str) -> None: 455 """ 456 Update `device`. 457 """ 458 self.__device = self._process_device_argument(x) 459 460 @property 461 def average_strands(self) -> bool: 462 """ 463 Retrieve `average_strands`. 464 465 Returns: 466 **bool:** 467 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 468 """ 469 return self.__average_strands 470 471 @average_strands.setter 472 def average_strands(self, x: bool) -> None: 473 """ 474 Update `average_strands`. 475 """ 476 self.__average_strands = x 477 478 @property 479 def samples_subset(self) -> Optional[Union[int, List[int]]]: 480 """ 481 Retrieve `samples_subset`. 482 483 Returns: 484 **int or list of int:** 485 Subset of samples to include, as an integer for the first samples or a list of sample indices. 486 """ 487 return self.__samples_subset 488 489 @samples_subset.setter 490 def samples_subset(self, x: Optional[Union[int, List[int]]]) -> None: 491 """ 492 Update `samples_subset`. 493 """ 494 self.__samples_subset = x 495 496 @property 497 def snps_subset(self) -> Optional[Union[int, List[int]]]: 498 """ 499 Retrieve `snps_subset`. 500 501 Returns: 502 **int or list of int:** Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices. 503 """ 504 return self.__snps_subset 505 506 @snps_subset.setter 507 def snps_subset(self, x: Optional[Union[int, List[int]]]) -> None: 508 """ 509 Update `snps_subset`. 510 """ 511 self.__snps_subset = x 512 513 @property 514 def n_components_(self) -> Optional[int]: 515 """ 516 Retrieve `n_components_`. 517 518 Returns: 519 **int:** 520 The effective number of components retained after fitting, 521 calculated as `min(self.n_components, min(n_samples, n_snps))`. 522 """ 523 return self.__n_components_ 524 525 @n_components_.setter 526 def n_components_(self, x: int) -> None: 527 """ 528 Update `n_components_`. 529 """ 530 self.__n_components_ = x 531 532 @property 533 def components_(self) -> Optional[Union[torch.Tensor, np.ndarray]]: 534 """ 535 Retrieve `components_`. 536 537 Returns: 538 **tensor or array of shape (n_components_, n_snps):** 539 Matrix of principal components, where each row is a principal component vector. 540 """ 541 return self.__components_ 542 543 @components_.setter 544 def components_(self, x: Union[torch.Tensor, np.ndarray]) -> None: 545 """ 546 Update `components_`. 547 """ 548 self.__components_ = x 549 550 @property 551 def mean_(self) -> Optional[Union[torch.Tensor, np.ndarray]]: 552 """ 553 Retrieve `mean_`. 554 555 Returns: 556 **tensor or array of shape (n_snps,):** 557 Per-feature mean vector of the input data used for centering. 558 """ 559 return self.__mean_ 560 561 @mean_.setter 562 def mean_(self, x: Union[torch.Tensor, np.ndarray]) -> None: 563 """ 564 Update `mean_`. 565 """ 566 self.__mean_ = x 567 568 @property 569 def X_(self) -> Optional[Union[torch.Tensor, np.ndarray]]: 570 """ 571 Retrieve `X_`. 572 573 Returns: 574 **tensor or array of shape (n_samples, n_snps):** 575 The SNP data matrix used to fit the model. 576 """ 577 return self.__X_ 578 579 @X_.setter 580 def X_(self, x: Union[torch.Tensor, np.ndarray]) -> None: 581 """ 582 Update `X_`. 583 """ 584 self.__X_ = x 585 586 @property 587 def X_new_(self) -> Optional[Union[torch.Tensor, np.ndarray]]: 588 """ 589 Retrieve `X_new_`. 590 591 Returns: 592 **tensor or array of shape (n_samples, n_components_):** 593 The transformed SNP data projected onto the `n_components_` principal components. 594 """ 595 return self.__X_new_ 596 597 @X_new_.setter 598 def X_new_(self, x: Union[torch.Tensor, np.ndarray]) -> None: 599 """ 600 Update `X_new_`. 601 """ 602 self.__X_new_ = x 603 604 def copy(self) -> 'PCA': 605 """ 606 Create and return a copy of `self`. 607 608 Returns: 609 **PCA:** 610 A new instance of the current object. 611 """ 612 return copy.copy(self) 613 614 def _process_device_argument(self, device: str): 615 """ 616 Process the device argument to map user-friendly device names to PyTorch device specifications. 617 618 Args: 619 device (str): Device specified by the user. 620 621 Returns: 622 torch.device: PyTorch device object. 623 """ 624 if isinstance(device, str): 625 device_lower = device.lower() 626 if device_lower in ['cpu']: 627 return torch.device('cpu') 628 elif device_lower in ['gpu', 'cuda']: 629 if torch.cuda.is_available(): 630 return torch.device('cuda') 631 else: 632 warnings.warn("CUDA is not available; using CPU instead.") 633 return torch.device('cpu') 634 elif device_lower.startswith('cuda:'): 635 if torch.cuda.is_available(): 636 return torch.device(device_lower) 637 else: 638 warnings.warn(f"CUDA is not available; requested device '{device}' is not available. Using CPU instead.") 639 return torch.device('cpu') 640 else: 641 raise ValueError(f"Unknown device type: '{device}'. Please use 'CPU', 'GPU', 'cuda', or 'cuda:<index>'.") 642 elif isinstance(device, torch.device): 643 return device 644 else: 645 raise TypeError(f"Device must be a string or torch.device, got {type(device)}.") 646 647 def _get_data_from_snpobj( 648 self, 649 snpobj: Optional['SNPObject'] = None, 650 average_strands: Optional[bool] = None, 651 samples_subset: Optional[Union[int, List]] = None, 652 snps_subset: Optional[Union[int, List]] = None 653 ) -> Union[np.ndarray, torch.Tensor]: 654 """ 655 Retrieve and prepare SNP data for PCA analysis, with options for selecting subsets and handling strands. 656 657 This method processes SNP data stored in an `SNPObject`, which may include averaging of paternal 658 and maternal strands or selecting subsets of samples and SNPs. The prepared data is formatted 659 for use in PCA, optionally converted to a PyTorch tensor if the backend is set to 'pytorch'. 660 661 Args: 662 snpobj (SNPObject, optional): 663 A SNPObject object instance. If None, defaults to `self.snpobj`. 664 average_strands (bool, optional): 665 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 666 If None, defaults to `self.average_strands`. 667 samples_subset (int or list of int, optional): 668 Subset of samples to include, as an integer for the first samples or a list of sample indices. 669 If None, defaults to `self.samples_subset`. 670 snps_subset (int or list of int, optional): 671 Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices. 672 If None, defaults to `self.snps_subset`. 673 674 Returns: 675 numpy.ndarray or torch.Tensor: 676 The processed SNP data. If `backend` is set to 'pytorch', the data is returned as a 677 PyTorch tensor on the specified `device`. 678 """ 679 if snpobj is None: 680 snpobj = self.snpobj 681 if average_strands is None: 682 average_strands = self.average_strands 683 if samples_subset is None: 684 samples_subset = self.samples_subset 685 if snps_subset is None: 686 snps_subset = self.snps_subset 687 688 if snpobj.calldata_gt.ndim == 2: 689 X = np.transpose(snpobj.calldata_gt.astype(float), (1,0)) 690 elif snpobj.calldata_gt.ndim == 3: 691 X = np.transpose(snpobj.calldata_gt.astype(float), (1,0,2)) 692 693 if average_strands: 694 X = np.mean(X, axis=2) 695 else: 696 X = np.reshape(X, (-1, X.shape[1])) 697 else: 698 raise ValueError(f"Invalid shape for `calldata_gt`: expected a 2D or 3D array, but got {snpobj.calldata_gt.ndim}D array.") 699 700 # Handle sample and SNP subsets 701 if isinstance(samples_subset, int): 702 X = X[:samples_subset] 703 elif isinstance(samples_subset, list): 704 X = X[samples_subset] 705 706 if isinstance(snps_subset, int): 707 X = X[:, :snps_subset] 708 elif isinstance(snps_subset, list): 709 X = X[:, snps_subset] 710 711 if self.backend == "pytorch": 712 print(f"Converting data to PyTorch tensor on device {self.device}") 713 X = torch.from_numpy(X).to(self.device) 714 715 return X 716 717 def fit( 718 self, 719 snpobj: Optional['SNPObject'] = None, 720 average_strands: Optional[bool] = None, 721 samples_subset: Optional[Union[int, List]] = None, 722 snps_subset: Optional[Union[int, List]] = None 723 ) -> 'PCA': 724 """ 725 Fit the model to the input SNP data stored in the provided `snpobj`. 726 727 Args: 728 snpobj (SNPObject, optional): 729 A SNPObject instance. If None, defaults to `self.snpobj`. 730 average_strands (bool, optional): 731 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 732 If None, defaults to `self.average_strands`. 733 samples_subset (int or list of int, optional): 734 Subset of samples to include, as an integer for the first samples or a list of sample indices. 735 If None, defaults to `self.samples_subset`. 736 snps_subset (int or list of int, optional): 737 Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices. 738 If None, defaults to `self.snps_subset`. 739 740 Returns: 741 **PCA:** 742 The fitted instance of `self`. 743 """ 744 self.X_ = self._get_data_from_snpobj(snpobj, average_strands, samples_subset, snps_subset) 745 self.pca.fit(self.X_) 746 747 # Update attributes based on the fitted model 748 self.n_components_ = self.pca.n_components_ 749 self.components_ = self.pca.components_ 750 self.mean_ = self.pca.mean_ 751 752 return self 753 754 def transform( 755 self, 756 snpobj: Optional['SNPObject'] = None, 757 average_strands: Optional[bool] = None, 758 samples_subset: Optional[Union[int, List]] = None, 759 snps_subset: Optional[Union[int, List]] = None 760 ): 761 """ 762 Apply dimensionality reduction to the input SNP data stored in the provided `snpobj` using the fitted model. 763 764 Args: 765 snpobj (SNPObject, optional): 766 A SNPObject instance. If None, defaults to `self.snpobj`. 767 average_strands (bool, optional): 768 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 769 If None, defaults to `self.average_strands`. 770 samples_subset (int or list of int, optional): 771 Subset of samples to include, as an integer for the first samples or a list of sample indices. 772 If None, defaults to `self.samples_subset`. 773 snps_subset (int or list of int, optional): 774 Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices. 775 If None, defaults to `self.snps_subset`. 776 777 Returns: 778 **tensor or array of shape (n_samples, n_components):** 779 The transformed SNP data projected onto the `n_components_` principal components, 780 stored in `self.X_new_`. 781 """ 782 # Retrieve or update the data to transform 783 if snpobj is not None or self.X_ is None: 784 self.X_ = self._get_data_from_snpobj(snpobj, average_strands, samples_subset, snps_subset) 785 786 # Apply transformation using the fitted PCA model 787 return self.pca.transform(self.X_) 788 789 def fit_transform(self, snpobj: Optional['SNPObject'] = None, average_strands: Optional[bool] = None, 790 samples_subset: Optional[Union[int, List]] = None, snps_subset: Optional[Union[int, List]] = None): 791 """ 792 Fit the model to the SNP data stored in the provided `snpobj` and apply the dimensionality reduction 793 on the same SNP data. 794 795 Args: 796 snpobj (SNPObject, optional): 797 A SNPObject instance. If None, defaults to `self.snpobj`. 798 average_strands (bool, optional): 799 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 800 If None, defaults to `self.average_strands`. 801 samples_subset (int or list of int, optional): 802 Subset of samples to include, as an integer for the first samples or a list of sample indices. 803 If None, defaults to `self.samples_subset`. 804 snps_subset (int or list of int, optional): 805 Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices. 806 If None, defaults to `self.snps_subset`. 807 808 Returns: 809 **tensor or array of shape (n_samples, n_components):** 810 The transformed SNP data projected onto the `n_components_` principal components, 811 stored in `self.X_new_`. 812 """ 813 self.X_ = self._get_data_from_snpobj(snpobj, average_strands, samples_subset, snps_subset) 814 self.X_new_ = self.pca.fit_transform(self.X_) 815 816 # Update attributes based on the fitted model 817 self.n_components_ = self.pca.n_components_ 818 self.components_ = self.pca.components_ 819 self.mean_ = self.pca.mean_ 820 821 return self.X_new_
A class for Principal Component Analysis (PCA) on SNP data stored in a snputils.snp.genobj.SNPObject
.
This class employs either
sklearn.decomposition.PCA
or the custom TorchPCA
implementation for efficient GPU-accelerated analysis.
The PCA class supports both separate and averaged strand processing for SNP data. If the snpobj
parameter is
provided during instantiation, the fit_transform
method will be automatically called,
applying PCA to transform the data according to the selected configuration.
312 def __init__( 313 self, 314 snpobj: Optional['SNPObject'] = None, 315 backend: str = 'pytorch', 316 n_components: int = 2, 317 fitting: str = 'full', 318 device: str = 'cpu', 319 average_strands: bool = True, 320 samples_subset: Optional[Union[int, List]] = None, 321 snps_subset: Optional[Union[int, List]] = None 322 ): 323 """ 324 Args: 325 snpobj (SNPObject, optional): 326 A SNPObject instance. 327 backend (str, default='pytorch'): 328 The backend to use (`'sklearn'` or `'pytorch'`). Default is 'pytorch'. 329 n_components (int, default=2): 330 The number of principal components. Default is 2. 331 fitting (str, default='full'): 332 The fitting approach to use for the SVD computation (only for `backend='pytorch'`). 333 - `'full'`: Full Singular Value Decomposition (SVD). 334 - `'reduced'`: Economy SVD. 335 - `'lowrank'`: Low-rank approximation, which provides a faster but approximate solution. 336 Default is 'full'. 337 device (str, default='cpu'): 338 Device to use (`'cpu'`, `'gpu'`, `'cuda'`, or `'cuda:<index>'`). Default is 'cpu'. 339 average_strands (bool, default=True): 340 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 341 samples_subset (int or list of int, optional): 342 Subset of samples to include, as an integer for the first samples or a list of sample indices. 343 snps_subset (int or list of int, optional): 344 Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices. 345 """ 346 self.__snpobj = snpobj 347 self.__backend = backend 348 self.__n_components = n_components 349 self.__fitting = fitting 350 self.__device = self._process_device_argument(device) 351 self.__average_strands = average_strands 352 self.__samples_subset = samples_subset 353 self.__snps_subset = snps_subset 354 self.__X_ = None 355 self.__X_new_ = None # Store transformed SNP data 356 self.__n_components_ = None 357 self.__components_ = None 358 self.__mean_ = None 359 360 # Initialize PCA backend 361 if self.backend == "pytorch": 362 self.pca = TorchPCA(n_components=self.n_components, fitting=self.fitting) 363 elif self.backend == "sklearn": 364 self.pca = skPCA(n_components=self.n_components) 365 else: 366 raise ValueError("Unknown backend for PCA: ", backend) 367 368 # Fit and transform if a `snpobj` is provided 369 if self.snpobj is not None: 370 self.fit_transform(snpobj)
Arguments:
- snpobj (SNPObject, optional): A SNPObject instance.
- backend (str, default='pytorch'): The backend to use (
'sklearn'
or'pytorch'
). Default is 'pytorch'. - n_components (int, default=2): The number of principal components. Default is 2.
- fitting (str, default='full'): The fitting approach to use for the SVD computation (only for
backend='pytorch'
).'full'
: Full Singular Value Decomposition (SVD).'reduced'
: Economy SVD.'lowrank'
: Low-rank approximation, which provides a faster but approximate solution. Default is 'full'.
- device (str, default='cpu'): Device to use (
'cpu'
,'gpu'
,'cuda'
, or'cuda:<index>'
). Default is 'cpu'. - average_strands (bool, default=True): True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
- samples_subset (int or list of int, optional): Subset of samples to include, as an integer for the first samples or a list of sample indices.
- snps_subset (int or list of int, optional): Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices.
423 @property 424 def fitting(self) -> str: 425 """ 426 Retrieve `fitting`. 427 428 Returns: 429 **str:** 430 The fitting approach to use for the SVD computation (only for `backend='pytorch'`). 431 - `'full'`: Full Singular Value Decomposition (SVD). 432 - `'lowrank'`: Low-rank approximation, which provides a faster but approximate solution. 433 """ 434 return self.__fitting
Retrieve fitting
.
Returns:
str: The fitting approach to use for the SVD computation (only for
backend='pytorch'
). -'full'
: Full Singular Value Decomposition (SVD). -'lowrank'
: Low-rank approximation, which provides a faster but approximate solution.
460 @property 461 def average_strands(self) -> bool: 462 """ 463 Retrieve `average_strands`. 464 465 Returns: 466 **bool:** 467 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 468 """ 469 return self.__average_strands
Retrieve average_strands
.
Returns:
bool: True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
478 @property 479 def samples_subset(self) -> Optional[Union[int, List[int]]]: 480 """ 481 Retrieve `samples_subset`. 482 483 Returns: 484 **int or list of int:** 485 Subset of samples to include, as an integer for the first samples or a list of sample indices. 486 """ 487 return self.__samples_subset
Retrieve samples_subset
.
Returns:
int or list of int: Subset of samples to include, as an integer for the first samples or a list of sample indices.
496 @property 497 def snps_subset(self) -> Optional[Union[int, List[int]]]: 498 """ 499 Retrieve `snps_subset`. 500 501 Returns: 502 **int or list of int:** Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices. 503 """ 504 return self.__snps_subset
Retrieve snps_subset
.
Returns:
int or list of int: Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices.
513 @property 514 def n_components_(self) -> Optional[int]: 515 """ 516 Retrieve `n_components_`. 517 518 Returns: 519 **int:** 520 The effective number of components retained after fitting, 521 calculated as `min(self.n_components, min(n_samples, n_snps))`. 522 """ 523 return self.__n_components_
Retrieve n_components_
.
Returns:
int: The effective number of components retained after fitting, calculated as
min(self.n_components, min(n_samples, n_snps))
.
532 @property 533 def components_(self) -> Optional[Union[torch.Tensor, np.ndarray]]: 534 """ 535 Retrieve `components_`. 536 537 Returns: 538 **tensor or array of shape (n_components_, n_snps):** 539 Matrix of principal components, where each row is a principal component vector. 540 """ 541 return self.__components_
Retrieve components_
.
Returns:
tensor or array of shape (n_components_, n_snps): Matrix of principal components, where each row is a principal component vector.
550 @property 551 def mean_(self) -> Optional[Union[torch.Tensor, np.ndarray]]: 552 """ 553 Retrieve `mean_`. 554 555 Returns: 556 **tensor or array of shape (n_snps,):** 557 Per-feature mean vector of the input data used for centering. 558 """ 559 return self.__mean_
Retrieve mean_
.
Returns:
tensor or array of shape (n_snps,): Per-feature mean vector of the input data used for centering.
568 @property 569 def X_(self) -> Optional[Union[torch.Tensor, np.ndarray]]: 570 """ 571 Retrieve `X_`. 572 573 Returns: 574 **tensor or array of shape (n_samples, n_snps):** 575 The SNP data matrix used to fit the model. 576 """ 577 return self.__X_
Retrieve X_
.
Returns:
tensor or array of shape (n_samples, n_snps): The SNP data matrix used to fit the model.
586 @property 587 def X_new_(self) -> Optional[Union[torch.Tensor, np.ndarray]]: 588 """ 589 Retrieve `X_new_`. 590 591 Returns: 592 **tensor or array of shape (n_samples, n_components_):** 593 The transformed SNP data projected onto the `n_components_` principal components. 594 """ 595 return self.__X_new_
Retrieve X_new_
.
Returns:
tensor or array of shape (n_samples, n_components_): The transformed SNP data projected onto the
n_components_
principal components.
604 def copy(self) -> 'PCA': 605 """ 606 Create and return a copy of `self`. 607 608 Returns: 609 **PCA:** 610 A new instance of the current object. 611 """ 612 return copy.copy(self)
Create and return a copy of self
.
Returns:
PCA: A new instance of the current object.
717 def fit( 718 self, 719 snpobj: Optional['SNPObject'] = None, 720 average_strands: Optional[bool] = None, 721 samples_subset: Optional[Union[int, List]] = None, 722 snps_subset: Optional[Union[int, List]] = None 723 ) -> 'PCA': 724 """ 725 Fit the model to the input SNP data stored in the provided `snpobj`. 726 727 Args: 728 snpobj (SNPObject, optional): 729 A SNPObject instance. If None, defaults to `self.snpobj`. 730 average_strands (bool, optional): 731 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 732 If None, defaults to `self.average_strands`. 733 samples_subset (int or list of int, optional): 734 Subset of samples to include, as an integer for the first samples or a list of sample indices. 735 If None, defaults to `self.samples_subset`. 736 snps_subset (int or list of int, optional): 737 Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices. 738 If None, defaults to `self.snps_subset`. 739 740 Returns: 741 **PCA:** 742 The fitted instance of `self`. 743 """ 744 self.X_ = self._get_data_from_snpobj(snpobj, average_strands, samples_subset, snps_subset) 745 self.pca.fit(self.X_) 746 747 # Update attributes based on the fitted model 748 self.n_components_ = self.pca.n_components_ 749 self.components_ = self.pca.components_ 750 self.mean_ = self.pca.mean_ 751 752 return self
Fit the model to the input SNP data stored in the provided snpobj
.
Arguments:
- snpobj (SNPObject, optional): A SNPObject instance. If None, defaults to
self.snpobj
. - average_strands (bool, optional): True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
If None, defaults to
self.average_strands
. - samples_subset (int or list of int, optional): Subset of samples to include, as an integer for the first samples or a list of sample indices.
If None, defaults to
self.samples_subset
. - snps_subset (int or list of int, optional): Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices.
If None, defaults to
self.snps_subset
.
Returns:
PCA: The fitted instance of
self
.
754 def transform( 755 self, 756 snpobj: Optional['SNPObject'] = None, 757 average_strands: Optional[bool] = None, 758 samples_subset: Optional[Union[int, List]] = None, 759 snps_subset: Optional[Union[int, List]] = None 760 ): 761 """ 762 Apply dimensionality reduction to the input SNP data stored in the provided `snpobj` using the fitted model. 763 764 Args: 765 snpobj (SNPObject, optional): 766 A SNPObject instance. If None, defaults to `self.snpobj`. 767 average_strands (bool, optional): 768 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 769 If None, defaults to `self.average_strands`. 770 samples_subset (int or list of int, optional): 771 Subset of samples to include, as an integer for the first samples or a list of sample indices. 772 If None, defaults to `self.samples_subset`. 773 snps_subset (int or list of int, optional): 774 Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices. 775 If None, defaults to `self.snps_subset`. 776 777 Returns: 778 **tensor or array of shape (n_samples, n_components):** 779 The transformed SNP data projected onto the `n_components_` principal components, 780 stored in `self.X_new_`. 781 """ 782 # Retrieve or update the data to transform 783 if snpobj is not None or self.X_ is None: 784 self.X_ = self._get_data_from_snpobj(snpobj, average_strands, samples_subset, snps_subset) 785 786 # Apply transformation using the fitted PCA model 787 return self.pca.transform(self.X_)
Apply dimensionality reduction to the input SNP data stored in the provided snpobj
using the fitted model.
Arguments:
- snpobj (SNPObject, optional): A SNPObject instance. If None, defaults to
self.snpobj
. - average_strands (bool, optional): True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
If None, defaults to
self.average_strands
. - samples_subset (int or list of int, optional): Subset of samples to include, as an integer for the first samples or a list of sample indices.
If None, defaults to
self.samples_subset
. - snps_subset (int or list of int, optional): Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices.
If None, defaults to
self.snps_subset
.
Returns:
tensor or array of shape (n_samples, n_components): The transformed SNP data projected onto the
n_components_
principal components, stored inself.X_new_
.
789 def fit_transform(self, snpobj: Optional['SNPObject'] = None, average_strands: Optional[bool] = None, 790 samples_subset: Optional[Union[int, List]] = None, snps_subset: Optional[Union[int, List]] = None): 791 """ 792 Fit the model to the SNP data stored in the provided `snpobj` and apply the dimensionality reduction 793 on the same SNP data. 794 795 Args: 796 snpobj (SNPObject, optional): 797 A SNPObject instance. If None, defaults to `self.snpobj`. 798 average_strands (bool, optional): 799 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 800 If None, defaults to `self.average_strands`. 801 samples_subset (int or list of int, optional): 802 Subset of samples to include, as an integer for the first samples or a list of sample indices. 803 If None, defaults to `self.samples_subset`. 804 snps_subset (int or list of int, optional): 805 Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices. 806 If None, defaults to `self.snps_subset`. 807 808 Returns: 809 **tensor or array of shape (n_samples, n_components):** 810 The transformed SNP data projected onto the `n_components_` principal components, 811 stored in `self.X_new_`. 812 """ 813 self.X_ = self._get_data_from_snpobj(snpobj, average_strands, samples_subset, snps_subset) 814 self.X_new_ = self.pca.fit_transform(self.X_) 815 816 # Update attributes based on the fitted model 817 self.n_components_ = self.pca.n_components_ 818 self.components_ = self.pca.components_ 819 self.mean_ = self.pca.mean_ 820 821 return self.X_new_
Fit the model to the SNP data stored in the provided snpobj
and apply the dimensionality reduction
on the same SNP data.
Arguments:
- snpobj (SNPObject, optional): A SNPObject instance. If None, defaults to
self.snpobj
. - average_strands (bool, optional): True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
If None, defaults to
self.average_strands
. - samples_subset (int or list of int, optional): Subset of samples to include, as an integer for the first samples or a list of sample indices.
If None, defaults to
self.samples_subset
. - snps_subset (int or list of int, optional): Subset of SNPs to include, as an integer for the first SNPs or a list of SNP indices.
If None, defaults to
self.snps_subset
.
Returns:
tensor or array of shape (n_samples, n_components): The transformed SNP data projected onto the
n_components_
principal components, stored inself.X_new_
.