snputils.processing.maasmds
1import pathlib 2import numpy as np 3import copy 4from typing import Optional, Dict, List, Union 5 6from snputils.snp.genobj.snpobj import SNPObject 7from snputils.ancestry.genobj.local import LocalAncestryObject 8from ._utils.mds_distance import distance_mat, mds_transform 9from ._utils.gen_tools import process_calldata_gt, process_labels_weights 10 11 12class maasMDS: 13 """ 14 A class for performing multiple array ancestry-specific multidimensional scaling (maasMDS) on SNP data. 15 16 The maasMDS class focuses on genotype segments from the ancestry of interest when the `is_masked` flag is set to `True`. It offers 17 flexible processing options, allowing either separate handling of masked haplotype strands or combining (averaging) strands into a 18 single composite representation for each individual. Moreover, the analysis can be performed on individual-level data, group-level SNP 19 frequencies, or a combination of both. 20 21 This class supports both separate and averaged strand processing for SNP data. If the `snpobj`, 22 `laiobj`, `labels_file`, and `ancestry` parameters are all provided during instantiation, 23 the `fit_transform` method will be automatically called, applying the specified maasMDS method to transform 24 the data upon instantiation. 25 """ 26 def __init__( 27 self, 28 snpobj: Optional['SNPObject'] = None, 29 laiobj: Optional['LocalAncestryObject'] = None, 30 labels_file: Optional[str] = None, 31 ancestry: Optional[Union[int, str]] = None, 32 is_masked: bool = True, 33 average_strands: bool = False, 34 force_nan_incomplete_strands: bool = False, 35 is_weighted: bool = False, 36 groups_to_remove: Dict[int, List[str]] = {}, 37 min_percent_snps: float = 4, 38 group_snp_frequencies_only: bool = True, 39 save_masks: bool = False, 40 load_masks: bool = False, 41 masks_file: Union[str, pathlib.Path] = 'masks.npz', 42 distance_type: str = 'AP', 43 n_components: int = 2, 44 rsid_or_chrompos: int = 2 45 ): 46 """ 47 Args: 48 snpobj (SNPObject, optional): 49 A SNPObject instance. 50 laiobj (LAIObject, optional): 51 A LAIObject instance. 52 labels_file (str, optional): 53 Path to the labels file in .tsv format. The first column, `indID`, contains the individual identifiers, and the second 54 column, `label`, specifies the groups for all individuals. If `is_weighted=True`, a `weight` column with individual 55 weights is required. Optionally, `combination` and `combination_weight` columns can specify sets of individuals to be 56 combined into groups, with respective weights. 57 ancestry (int or str, optional): 58 Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at `0`. The ancestry input can be: 59 - An integer (e.g., 0, 1, 2). 60 - A string representation of an integer (e.g., '0', '1'). 61 - A string matching one of the ancestry map values (e.g., 'Africa'). 62 is_masked (bool, default=True): 63 If `True`, applies ancestry-specific masking to the genotype matrix, retaining only genotype data 64 corresponding to the specified `ancestry`. If `False`, uses the full, unmasked genotype matrix. 65 average_strands (bool, default=False): 66 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 67 force_nan_incomplete_strands (bool): 68 If `True`, sets the result to NaN if either haplotype in a pair is NaN. 69 Otherwise, computes the mean while ignoring NaNs (e.g., 0|NaN -> 0, 1|NaN -> 1). 70 is_weighted (bool, default=False): 71 True if weights are provided in the labels file, or False otherwise. 72 groups_to_remove (dict of int to list of str, default={}): 73 Dictionary specifying groups to exclude from analysis. Keys are array numbers, and values are 74 lists of groups to remove for each array. 75 Example: `{1: ['group1', 'group2'], 2: [], 3: ['group3']}`. 76 min_percent_snps (float, default=4.0): 77 Minimum percentage of SNPs to be known in an individual for an individual to be included in the analysis. 78 All individuals with fewer percent of unmasked SNPs than this threshold will be excluded. 79 group_snp_frequencies_only (bool, default=True): 80 If True, mdPCA is performed exclusively on group-level SNP frequencies, ignoring individual-level data. This applies when `is_weighted` is 81 set to True and a `combination` column is provided in the `labels_file`, meaning individuals are aggregated into groups based on their assigned 82 labels. If False, mdPCA is performed on individual-level SNP data alone or on both individual-level and group-level SNP frequencies when 83 `is_weighted` is True and a `combination` column is provided. 84 save_masks (bool, default=False): 85 True if the masked matrices are to be saved in a `.npz` file, or False otherwise. 86 load_masks (bool, default=False): 87 True if the masked matrices are to be loaded from a pre-existing `.npz` file specified by `masks_file`, or False otherwise. 88 masks_file (str or pathlib.Path, default='masks.npz'): 89 Path to the `.npz` file used for saving/loading masked matrices. 90 distance_type (str, default='AP'): 91 Distance metric to use. Options to choose from are: 'Manhattan', 'RMS' (Root Mean Square), 'AP' (Average Pairwise). 92 If `average_strands=True`, use 'distance_type=AP'. 93 n_components (int, default=2): 94 The number of principal components. 95 rsid_or_chrompos (int, default=2): 96 Format indicator for SNP IDs in the SNP data. Use 1 for `rsID` format or 2 for `chromosome_position`. 97 """ 98 self.__snpobj = snpobj 99 self.__laiobj = laiobj 100 self.__labels_file = labels_file 101 self.__ancestry = self._define_ancestry(ancestry, laiobj.ancestry_map) 102 self.__is_masked = is_masked 103 self.__average_strands = average_strands 104 self.__force_nan_incomplete_strands = force_nan_incomplete_strands 105 self.__groups_to_remove = groups_to_remove 106 self.__min_percent_snps = min_percent_snps 107 self.__group_snp_frequencies_only = group_snp_frequencies_only 108 self.__is_weighted = is_weighted 109 self.__save_masks = save_masks 110 self.__load_masks = load_masks 111 self.__masks_file = masks_file 112 self.__distance_type = distance_type 113 self.__n_components = n_components 114 self.__rsid_or_chrompos = rsid_or_chrompos 115 self.__X_new_ = None # Store transformed SNP data 116 self.__haplotypes_ = None # Store haplotypes after filtering if min_percent_snps > 0 117 self.__samples_ = None # Store samples after filtering if min_percent_snps > 0 118 self.__variants_id_ = None # Store variants ID (after filtering SNPs not in laiobj) 119 120 # Fit and transform if a `snpobj`, `laiobj`, `labels_file`, and `ancestry` are provided 121 if self.snpobj is not None and self.laiobj is not None and self.labels_file is not None and self.ancestry is not None: 122 self.fit_transform(snpobj, laiobj, labels_file, ancestry) 123 124 def __getitem__(self, key): 125 """ 126 To access an attribute of the class using the square bracket notation, 127 similar to a dictionary. 128 """ 129 try: 130 return getattr(self, key) 131 except AttributeError: 132 raise KeyError(f'Invalid key: {key}') 133 134 def __setitem__(self, key, value): 135 """ 136 To set an attribute of the class using the square bracket notation, 137 similar to a dictionary. 138 """ 139 try: 140 setattr(self, key, value) 141 except AttributeError: 142 raise KeyError(f'Invalid key: {key}') 143 144 def copy(self) -> 'maasMDS': 145 """ 146 Create and return a copy of `self`. 147 148 Returns: 149 **maasMDS:** 150 A new instance of the current object. 151 """ 152 return copy.copy(self) 153 154 @property 155 def snpobj(self) -> Optional['SNPObject']: 156 """ 157 Retrieve `snpobj`. 158 159 Returns: 160 **SNPObject:** A SNPObject instance. 161 """ 162 return self.__snpobj 163 164 @snpobj.setter 165 def snpobj(self, x: 'SNPObject') -> None: 166 """ 167 Update `snpobj`. 168 """ 169 self.__snpobj = x 170 171 @property 172 def laiobj(self) -> Optional['LocalAncestryObject']: 173 """ 174 Retrieve `laiobj`. 175 176 Returns: 177 **LocalAncestryObject:** A LAIObject instance. 178 """ 179 return self.__laiobj 180 181 @laiobj.setter 182 def laiobj(self, x: 'LocalAncestryObject') -> None: 183 """ 184 Update `laiobj`. 185 """ 186 self.__laiobj = x 187 188 @property 189 def labels_file(self) -> Optional[str]: 190 """ 191 Retrieve `labels_file`. 192 193 Returns: 194 **str:** 195 Path to the labels file in `.tsv` format. 196 """ 197 return self.__labels_file 198 199 @labels_file.setter 200 def labels_file(self, x: str) -> None: 201 """ 202 Update `labels_file`. 203 """ 204 self.__labels_file = x 205 206 @property 207 def ancestry(self) -> Optional[str]: 208 """ 209 Retrieve `ancestry`. 210 211 Returns: 212 **str:** Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at `0`. 213 """ 214 return self.__ancestry 215 216 @ancestry.setter 217 def ancestry(self, x: str) -> None: 218 """ 219 Update `ancestry`. 220 """ 221 self.__ancestry = x 222 223 @property 224 def is_masked(self) -> bool: 225 """ 226 Retrieve `is_masked`. 227 228 Returns: 229 **bool:** True if an ancestry file is passed for ancestry-specific masking, or False otherwise. 230 """ 231 return self.__is_masked 232 233 @is_masked.setter 234 def is_masked(self, x: bool) -> None: 235 """ 236 Update `is_masked`. 237 """ 238 self.__is_masked = x 239 240 @property 241 def average_strands(self) -> bool: 242 """ 243 Retrieve `average_strands`. 244 245 Returns: 246 **bool:** True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 247 """ 248 return self.__average_strands 249 250 @average_strands.setter 251 def average_strands(self, x: bool) -> None: 252 """ 253 Update `average_strands`. 254 """ 255 self.__average_strands = x 256 257 @property 258 def force_nan_incomplete_strands(self) -> bool: 259 """ 260 Retrieve `force_nan_incomplete_strands`. 261 262 Returns: 263 **bool**: If `True`, sets the result to NaN if either haplotype in a pair is NaN. 264 Otherwise, computes the mean while ignoring NaNs (e.g., 0|NaN -> 0, 1|NaN -> 1). 265 """ 266 return self.__force_nan_incomplete_strands 267 268 @force_nan_incomplete_strands.setter 269 def force_nan_incomplete_strands(self, x: bool) -> None: 270 """ 271 Update `force_nan_incomplete_strands`. 272 """ 273 self.__force_nan_incomplete_strands = x 274 275 @property 276 def is_weighted(self) -> bool: 277 """ 278 Retrieve `is_weighted`. 279 280 Returns: 281 **bool:** True if weights are provided in the labels file, or False otherwise. 282 """ 283 return self.__is_weighted 284 285 @is_weighted.setter 286 def is_weighted(self, x: bool) -> None: 287 """ 288 Update `is_weighted`. 289 """ 290 self.__is_weighted = x 291 292 @property 293 def groups_to_remove(self) -> Dict[int, List[str]]: 294 """ 295 Retrieve `groups_to_remove`. 296 297 Returns: 298 **dict of int to list of str:** Dictionary specifying groups to exclude from analysis. Keys are array numbers, and values are 299 lists of groups to remove for each array. Example: `{1: ['group1', 'group2'], 2: [], 3: ['group3']}`. 300 """ 301 return self.__groups_to_remove 302 303 @groups_to_remove.setter 304 def groups_to_remove(self, x: Dict[int, List[str]]) -> None: 305 """ 306 Update `groups_to_remove`. 307 """ 308 self.__groups_to_remove = x 309 310 @property 311 def min_percent_snps(self) -> float: 312 """ 313 Retrieve `min_percent_snps`. 314 315 Returns: 316 **float:** 317 Minimum percentage of SNPs to be known in an individual for an individual to be included in the analysis. 318 All individuals with fewer percent of unmasked SNPs than this threshold will be excluded. 319 """ 320 return self.__min_percent_snps 321 322 @min_percent_snps.setter 323 def min_percent_snps(self, x: float) -> None: 324 """ 325 Update `min_percent_snps`. 326 """ 327 self.__min_percent_snps = x 328 329 @property 330 def group_snp_frequencies_only(self) -> bool: 331 """ 332 Retrieve `group_snp_frequencies_only`. 333 334 Returns: 335 **bool:** 336 If True, mdPCA is performed exclusively on group-level SNP frequencies, ignoring individual-level data. This applies 337 when `is_weighted` is set to True and a `combination` column is provided in the `labels_file`, meaning individuals are 338 aggregated into groups based on their assigned labels. If False, mdPCA is performed on individual-level SNP data alone 339 or on both individual-level and group-level SNP frequencies when `is_weighted` is True and a `combination` column is provided. 340 """ 341 return self.__group_snp_frequencies_only 342 343 @group_snp_frequencies_only.setter 344 def group_snp_frequencies_only(self, x: bool) -> None: 345 """ 346 Update `group_snp_frequencies_only`. 347 """ 348 self.__group_snp_frequencies_only = x 349 350 @property 351 def save_masks(self) -> bool: 352 """ 353 Retrieve `save_masks`. 354 355 Returns: 356 **bool:** True if the masked matrices are to be saved in a `.npz` file, or False otherwise. 357 """ 358 return self.__save_masks 359 360 @save_masks.setter 361 def save_masks(self, x: bool) -> None: 362 """ 363 Update `save_masks`. 364 """ 365 self.__save_masks = x 366 367 @property 368 def load_masks(self) -> bool: 369 """ 370 Retrieve `load_masks`. 371 372 Returns: 373 **bool:** 374 True if the masked matrices are to be loaded from a pre-existing `.npz` file specified 375 by `masks_file`, or False otherwise. 376 """ 377 return self.__load_masks 378 379 @load_masks.setter 380 def load_masks(self, x: bool) -> None: 381 """ 382 Update `load_masks`. 383 """ 384 self.__load_masks = x 385 386 @property 387 def masks_file(self) -> Union[str, pathlib.Path]: 388 """ 389 Retrieve `masks_file`. 390 391 Returns: 392 **str or pathlib.Path:** Path to the `.npz` file used for saving/loading masked matrices. 393 """ 394 return self.__masks_file 395 396 @masks_file.setter 397 def masks_file(self, x: Union[str, pathlib.Path]) -> None: 398 """ 399 Update `masks_file`. 400 """ 401 self.__masks_file = x 402 403 @property 404 def distance_type(self) -> str: 405 """ 406 Retrieve `distance_type`. 407 408 Returns: 409 **str:** 410 Distance metric to use. Options to choose from are: 'Manhattan', 'RMS' (Root Mean Square), 'AP' (Average Pairwise). 411 If `average_strands=True`, use 'distance_type=AP'. 412 """ 413 return self.__distance_type 414 415 @distance_type.setter 416 def distance_type(self, x: str) -> None: 417 """ 418 Update `distance_type`. 419 """ 420 self.__distance_type = x 421 422 @property 423 def n_components(self) -> int: 424 """ 425 Retrieve `n_components`. 426 427 Returns: 428 **int:** The number of principal components. 429 """ 430 return self.__n_components 431 432 @n_components.setter 433 def n_components(self, x: int) -> None: 434 """ 435 Update `n_components`. 436 """ 437 self.__n_components = x 438 439 @property 440 def rsid_or_chrompos(self) -> int: 441 """ 442 Retrieve `rsid_or_chrompos`. 443 444 Returns: 445 **int:** Format indicator for SNP IDs in the SNP data. Use 1 for `rsID` format or 2 for `chromosome_position`. 446 """ 447 return self.__rsid_or_chrompos 448 449 @rsid_or_chrompos.setter 450 def rsid_or_chrompos(self, x: int) -> None: 451 """ 452 Update `rsid_or_chrompos`. 453 """ 454 self.__rsid_or_chrompos = x 455 456 @property 457 def X_new_(self) -> Optional[np.ndarray]: 458 """ 459 Retrieve `X_new_`. 460 461 Returns: 462 **array of shape (n_haplotypes_, n_components):** 463 The transformed SNP data projected onto the `n_components` principal components. 464 n_haplotypes_ is the number of haplotypes, potentially reduced if filtering is applied 465 (`min_percent_snps > 0`). For diploid individuals without filtering, the shape is 466 `(n_samples * 2, n_components)`. 467 """ 468 return self.__X_new_ 469 470 @X_new_.setter 471 def X_new_(self, x: np.ndarray) -> None: 472 """ 473 Update `X_new_`. 474 """ 475 self.__X_new_ = x 476 477 @property 478 def haplotypes_(self) -> Optional[List[str]]: 479 """ 480 Retrieve `haplotypes_`. 481 482 Returns: 483 list of str: 484 A list of unique haplotype identifiers. 485 """ 486 if isinstance(self.__haplotypes_, np.ndarray): 487 return self.__haplotypes_.ravel().tolist() # Flatten and convert NumPy array to a list 488 elif isinstance(self.__haplotypes_, list): 489 if len(self.__haplotypes_) == 1 and isinstance(self.__haplotypes_[0], np.ndarray): 490 return self.__haplotypes_[0].ravel().tolist() # Handle list containing a single array 491 return self.__haplotypes_ # Already a flat list 492 elif self.__haplotypes_ is None: 493 return None # If no haplotypes are set 494 else: 495 raise TypeError("`haplotypes_` must be a list or a NumPy array.") 496 497 @haplotypes_.setter 498 def haplotypes_(self, x: Union[np.ndarray, List[str]]) -> None: 499 """ 500 Update `haplotypes_`. 501 """ 502 if isinstance(x, np.ndarray): 503 self.__haplotypes_ = x.ravel().tolist() # Flatten and convert to a list 504 elif isinstance(x, list): 505 if len(x) == 1 and isinstance(x[0], np.ndarray): # Handle list containing a single array 506 self.__haplotypes_ = x[0].ravel().tolist() 507 else: 508 self.__haplotypes_ = x # Use directly if already a list 509 else: 510 raise TypeError("`x` must be a list or a NumPy array.") 511 512 @property 513 def samples_(self) -> Optional[List[str]]: 514 """ 515 Retrieve `samples_`. 516 517 Returns: 518 list of str: 519 A list of sample identifiers based on `haplotypes_` and `average_strands`. 520 """ 521 haplotypes = self.haplotypes_ 522 if haplotypes is None: 523 return None 524 if self.__average_strands: 525 return haplotypes 526 else: 527 return [x[:-2] for x in haplotypes] 528 529 @property 530 def variants_id_(self) -> Optional[np.ndarray]: 531 """ 532 Retrieve `variants_id_`. 533 534 Returns: 535 **array of shape (n_snp,):** 536 An array containing unique identifiers (IDs) for each SNP, 537 potentially reduced if there are SNPs not present in the `laiboj`. 538 The format will depend on `rsid_or_chrompos`. 539 """ 540 return self.__variants_id_ 541 542 @variants_id_.setter 543 def variants_id_(self, x: np.ndarray) -> None: 544 """ 545 Update `variants_id_`. 546 """ 547 self.__variants_id_ = x 548 549 @property 550 def n_haplotypes(self) -> Optional[int]: 551 """ 552 Retrieve `n_haplotypes`. 553 554 Returns: 555 **int:** 556 The total number of haplotypes, potentially reduced if filtering is applied 557 (`min_percent_snps > 0`). 558 """ 559 return len(self.__haplotypes_) 560 561 @property 562 def n_samples(self) -> Optional[int]: 563 """ 564 Retrieve `n_samples`. 565 566 Returns: 567 **int:** 568 The total number of samples, potentially reduced if filtering is applied 569 (`min_percent_snps > 0`). 570 """ 571 return len(np.unique(self.samples_)) 572 573 @staticmethod 574 def _define_ancestry(ancestry, ancestry_map): 575 """ 576 Determine the ancestry index based on different input types. 577 578 Args: 579 ancestry (int or str): The ancestry input, which can be: 580 - An integer (e.g., 0, 1, 2). 581 - A string representation of an integer (e.g., '0', '1'). 582 - A string matching one of the ancestry map values (e.g., 'Africa'). 583 ancestry_map (dict): A dictionary mapping ancestry indices (as strings) to ancestry names. 584 585 Returns: 586 int: The corresponding ancestry index. 587 """ 588 if isinstance(ancestry, int): 589 return ancestry 590 elif isinstance(ancestry, str) and ancestry.isdigit(): 591 return int(ancestry) 592 elif ancestry in ancestry_map.values(): 593 return int(next(key for key, value in ancestry_map.items() if value == ancestry)) 594 else: 595 raise ValueError(f"Invalid ancestry input: {ancestry}") 596 597 @staticmethod 598 def _load_masks_file(masks_file): 599 mask_files = np.load(masks_file, allow_pickle=True) 600 mask = mask_files['mask'] 601 rs_ID_list = mask_files['rs_ID_list'] 602 ind_ID_list = mask_files['ind_ID_list'] 603 groups = mask_files['labels'] 604 weights = mask_files['weights'] 605 return mask, rs_ID_list, ind_ID_list, groups, weights 606 607 def fit_transform( 608 self, 609 snpobj: Optional['SNPObject'] = None, 610 laiobj: Optional['LocalAncestryObject'] = None, 611 labels_file: Optional[str] = None, 612 ancestry: Optional[str] = None, 613 average_strands: Optional[bool] = None 614 ) -> np.ndarray: 615 """ 616 Fit the model to the SNP data stored in the provided `snpobj` and apply the dimensionality reduction on the same SNP data. 617 618 Args: 619 snpobj (SNPObject, optional): 620 A SNPObject instance. 621 laiobj (LAIObject, optional): 622 A LAIObject instance. 623 labels_file (str, optional): 624 Path to the labels file in .tsv format. The first column, `indID`, contains the individual identifiers, and the second 625 column, `label`, specifies the groups for all individuals. If `is_weighted=True`, a `weight` column with individual 626 weights is required. Optionally, `combination` and `combination_weight` columns can specify sets of individuals to be 627 combined into groups, with respective weights. 628 ancestry (str, optional): 629 Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0. 630 average_strands (bool, optional): 631 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 632 If None, defaults to `self.average_strands`. 633 634 Returns: 635 **array of shape (n_samples, n_components):** 636 The transformed SNP data projected onto the `n_components` principal components, stored in `self.X_new_`. 637 """ 638 if snpobj is None: 639 snpobj = self.snpobj 640 if laiobj is None: 641 laiobj = self.laiobj 642 if labels_file is None: 643 labels_file = self.labels_file 644 if ancestry is None: 645 ancestry = self.ancestry 646 if average_strands is None: 647 average_strands = self.average_strands 648 649 if not self.is_masked: 650 self.ancestry = 1 651 if self.load_masks: 652 # Load precomputed ancestry-based masked genotype matrix, SNP identifiers, haplotype identifiers, and weights 653 mask, variants_id, haplotypes, _, weights = self._load_masks_file(self.masks_file) 654 else: 655 # Process genotype data with optional ancestry-based masking and return the corresponding SNP and individual identifiers 656 mask, variants_id, haplotypes = process_calldata_gt( 657 self.snpobj, 658 self.laiobj, 659 self.ancestry, 660 self.average_strands, 661 self.force_nan_incomplete_strands, 662 self.is_masked, 663 self.rsid_or_chrompos 664 ) 665 666 # Process individual genomic labels and weights, aligning them with a masked genotype matrix by 667 # filtering out low-coverage individuals, reordering data to match the matrix structure, and 668 # handling group-based adjustments 669 mask, haplotypes, groups, weights = process_labels_weights( 670 self.labels_file, 671 mask, 672 variants_id, 673 haplotypes, 674 self.average_strands, 675 self.ancestry, 676 self.min_percent_snps, 677 self.group_snp_frequencies_only, 678 self.groups_to_remove, 679 self.is_weighted, 680 self.save_masks, 681 self.masks_file 682 ) 683 684 distance_list = [[distance_mat(first=mask[self.ancestry], dist_func=self.distance_type)]] 685 686 self.X_new_ = mds_transform(distance_list, groups, weights, haplotypes, self.n_components) 687 688 self.haplotypes_ = haplotypes 689 self.variants_id_ = variants_id
13class maasMDS: 14 """ 15 A class for performing multiple array ancestry-specific multidimensional scaling (maasMDS) on SNP data. 16 17 The maasMDS class focuses on genotype segments from the ancestry of interest when the `is_masked` flag is set to `True`. It offers 18 flexible processing options, allowing either separate handling of masked haplotype strands or combining (averaging) strands into a 19 single composite representation for each individual. Moreover, the analysis can be performed on individual-level data, group-level SNP 20 frequencies, or a combination of both. 21 22 This class supports both separate and averaged strand processing for SNP data. If the `snpobj`, 23 `laiobj`, `labels_file`, and `ancestry` parameters are all provided during instantiation, 24 the `fit_transform` method will be automatically called, applying the specified maasMDS method to transform 25 the data upon instantiation. 26 """ 27 def __init__( 28 self, 29 snpobj: Optional['SNPObject'] = None, 30 laiobj: Optional['LocalAncestryObject'] = None, 31 labels_file: Optional[str] = None, 32 ancestry: Optional[Union[int, str]] = None, 33 is_masked: bool = True, 34 average_strands: bool = False, 35 force_nan_incomplete_strands: bool = False, 36 is_weighted: bool = False, 37 groups_to_remove: Dict[int, List[str]] = {}, 38 min_percent_snps: float = 4, 39 group_snp_frequencies_only: bool = True, 40 save_masks: bool = False, 41 load_masks: bool = False, 42 masks_file: Union[str, pathlib.Path] = 'masks.npz', 43 distance_type: str = 'AP', 44 n_components: int = 2, 45 rsid_or_chrompos: int = 2 46 ): 47 """ 48 Args: 49 snpobj (SNPObject, optional): 50 A SNPObject instance. 51 laiobj (LAIObject, optional): 52 A LAIObject instance. 53 labels_file (str, optional): 54 Path to the labels file in .tsv format. The first column, `indID`, contains the individual identifiers, and the second 55 column, `label`, specifies the groups for all individuals. If `is_weighted=True`, a `weight` column with individual 56 weights is required. Optionally, `combination` and `combination_weight` columns can specify sets of individuals to be 57 combined into groups, with respective weights. 58 ancestry (int or str, optional): 59 Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at `0`. The ancestry input can be: 60 - An integer (e.g., 0, 1, 2). 61 - A string representation of an integer (e.g., '0', '1'). 62 - A string matching one of the ancestry map values (e.g., 'Africa'). 63 is_masked (bool, default=True): 64 If `True`, applies ancestry-specific masking to the genotype matrix, retaining only genotype data 65 corresponding to the specified `ancestry`. If `False`, uses the full, unmasked genotype matrix. 66 average_strands (bool, default=False): 67 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 68 force_nan_incomplete_strands (bool): 69 If `True`, sets the result to NaN if either haplotype in a pair is NaN. 70 Otherwise, computes the mean while ignoring NaNs (e.g., 0|NaN -> 0, 1|NaN -> 1). 71 is_weighted (bool, default=False): 72 True if weights are provided in the labels file, or False otherwise. 73 groups_to_remove (dict of int to list of str, default={}): 74 Dictionary specifying groups to exclude from analysis. Keys are array numbers, and values are 75 lists of groups to remove for each array. 76 Example: `{1: ['group1', 'group2'], 2: [], 3: ['group3']}`. 77 min_percent_snps (float, default=4.0): 78 Minimum percentage of SNPs to be known in an individual for an individual to be included in the analysis. 79 All individuals with fewer percent of unmasked SNPs than this threshold will be excluded. 80 group_snp_frequencies_only (bool, default=True): 81 If True, mdPCA is performed exclusively on group-level SNP frequencies, ignoring individual-level data. This applies when `is_weighted` is 82 set to True and a `combination` column is provided in the `labels_file`, meaning individuals are aggregated into groups based on their assigned 83 labels. If False, mdPCA is performed on individual-level SNP data alone or on both individual-level and group-level SNP frequencies when 84 `is_weighted` is True and a `combination` column is provided. 85 save_masks (bool, default=False): 86 True if the masked matrices are to be saved in a `.npz` file, or False otherwise. 87 load_masks (bool, default=False): 88 True if the masked matrices are to be loaded from a pre-existing `.npz` file specified by `masks_file`, or False otherwise. 89 masks_file (str or pathlib.Path, default='masks.npz'): 90 Path to the `.npz` file used for saving/loading masked matrices. 91 distance_type (str, default='AP'): 92 Distance metric to use. Options to choose from are: 'Manhattan', 'RMS' (Root Mean Square), 'AP' (Average Pairwise). 93 If `average_strands=True`, use 'distance_type=AP'. 94 n_components (int, default=2): 95 The number of principal components. 96 rsid_or_chrompos (int, default=2): 97 Format indicator for SNP IDs in the SNP data. Use 1 for `rsID` format or 2 for `chromosome_position`. 98 """ 99 self.__snpobj = snpobj 100 self.__laiobj = laiobj 101 self.__labels_file = labels_file 102 self.__ancestry = self._define_ancestry(ancestry, laiobj.ancestry_map) 103 self.__is_masked = is_masked 104 self.__average_strands = average_strands 105 self.__force_nan_incomplete_strands = force_nan_incomplete_strands 106 self.__groups_to_remove = groups_to_remove 107 self.__min_percent_snps = min_percent_snps 108 self.__group_snp_frequencies_only = group_snp_frequencies_only 109 self.__is_weighted = is_weighted 110 self.__save_masks = save_masks 111 self.__load_masks = load_masks 112 self.__masks_file = masks_file 113 self.__distance_type = distance_type 114 self.__n_components = n_components 115 self.__rsid_or_chrompos = rsid_or_chrompos 116 self.__X_new_ = None # Store transformed SNP data 117 self.__haplotypes_ = None # Store haplotypes after filtering if min_percent_snps > 0 118 self.__samples_ = None # Store samples after filtering if min_percent_snps > 0 119 self.__variants_id_ = None # Store variants ID (after filtering SNPs not in laiobj) 120 121 # Fit and transform if a `snpobj`, `laiobj`, `labels_file`, and `ancestry` are provided 122 if self.snpobj is not None and self.laiobj is not None and self.labels_file is not None and self.ancestry is not None: 123 self.fit_transform(snpobj, laiobj, labels_file, ancestry) 124 125 def __getitem__(self, key): 126 """ 127 To access an attribute of the class using the square bracket notation, 128 similar to a dictionary. 129 """ 130 try: 131 return getattr(self, key) 132 except AttributeError: 133 raise KeyError(f'Invalid key: {key}') 134 135 def __setitem__(self, key, value): 136 """ 137 To set an attribute of the class using the square bracket notation, 138 similar to a dictionary. 139 """ 140 try: 141 setattr(self, key, value) 142 except AttributeError: 143 raise KeyError(f'Invalid key: {key}') 144 145 def copy(self) -> 'maasMDS': 146 """ 147 Create and return a copy of `self`. 148 149 Returns: 150 **maasMDS:** 151 A new instance of the current object. 152 """ 153 return copy.copy(self) 154 155 @property 156 def snpobj(self) -> Optional['SNPObject']: 157 """ 158 Retrieve `snpobj`. 159 160 Returns: 161 **SNPObject:** A SNPObject instance. 162 """ 163 return self.__snpobj 164 165 @snpobj.setter 166 def snpobj(self, x: 'SNPObject') -> None: 167 """ 168 Update `snpobj`. 169 """ 170 self.__snpobj = x 171 172 @property 173 def laiobj(self) -> Optional['LocalAncestryObject']: 174 """ 175 Retrieve `laiobj`. 176 177 Returns: 178 **LocalAncestryObject:** A LAIObject instance. 179 """ 180 return self.__laiobj 181 182 @laiobj.setter 183 def laiobj(self, x: 'LocalAncestryObject') -> None: 184 """ 185 Update `laiobj`. 186 """ 187 self.__laiobj = x 188 189 @property 190 def labels_file(self) -> Optional[str]: 191 """ 192 Retrieve `labels_file`. 193 194 Returns: 195 **str:** 196 Path to the labels file in `.tsv` format. 197 """ 198 return self.__labels_file 199 200 @labels_file.setter 201 def labels_file(self, x: str) -> None: 202 """ 203 Update `labels_file`. 204 """ 205 self.__labels_file = x 206 207 @property 208 def ancestry(self) -> Optional[str]: 209 """ 210 Retrieve `ancestry`. 211 212 Returns: 213 **str:** Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at `0`. 214 """ 215 return self.__ancestry 216 217 @ancestry.setter 218 def ancestry(self, x: str) -> None: 219 """ 220 Update `ancestry`. 221 """ 222 self.__ancestry = x 223 224 @property 225 def is_masked(self) -> bool: 226 """ 227 Retrieve `is_masked`. 228 229 Returns: 230 **bool:** True if an ancestry file is passed for ancestry-specific masking, or False otherwise. 231 """ 232 return self.__is_masked 233 234 @is_masked.setter 235 def is_masked(self, x: bool) -> None: 236 """ 237 Update `is_masked`. 238 """ 239 self.__is_masked = x 240 241 @property 242 def average_strands(self) -> bool: 243 """ 244 Retrieve `average_strands`. 245 246 Returns: 247 **bool:** True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 248 """ 249 return self.__average_strands 250 251 @average_strands.setter 252 def average_strands(self, x: bool) -> None: 253 """ 254 Update `average_strands`. 255 """ 256 self.__average_strands = x 257 258 @property 259 def force_nan_incomplete_strands(self) -> bool: 260 """ 261 Retrieve `force_nan_incomplete_strands`. 262 263 Returns: 264 **bool**: If `True`, sets the result to NaN if either haplotype in a pair is NaN. 265 Otherwise, computes the mean while ignoring NaNs (e.g., 0|NaN -> 0, 1|NaN -> 1). 266 """ 267 return self.__force_nan_incomplete_strands 268 269 @force_nan_incomplete_strands.setter 270 def force_nan_incomplete_strands(self, x: bool) -> None: 271 """ 272 Update `force_nan_incomplete_strands`. 273 """ 274 self.__force_nan_incomplete_strands = x 275 276 @property 277 def is_weighted(self) -> bool: 278 """ 279 Retrieve `is_weighted`. 280 281 Returns: 282 **bool:** True if weights are provided in the labels file, or False otherwise. 283 """ 284 return self.__is_weighted 285 286 @is_weighted.setter 287 def is_weighted(self, x: bool) -> None: 288 """ 289 Update `is_weighted`. 290 """ 291 self.__is_weighted = x 292 293 @property 294 def groups_to_remove(self) -> Dict[int, List[str]]: 295 """ 296 Retrieve `groups_to_remove`. 297 298 Returns: 299 **dict of int to list of str:** Dictionary specifying groups to exclude from analysis. Keys are array numbers, and values are 300 lists of groups to remove for each array. Example: `{1: ['group1', 'group2'], 2: [], 3: ['group3']}`. 301 """ 302 return self.__groups_to_remove 303 304 @groups_to_remove.setter 305 def groups_to_remove(self, x: Dict[int, List[str]]) -> None: 306 """ 307 Update `groups_to_remove`. 308 """ 309 self.__groups_to_remove = x 310 311 @property 312 def min_percent_snps(self) -> float: 313 """ 314 Retrieve `min_percent_snps`. 315 316 Returns: 317 **float:** 318 Minimum percentage of SNPs to be known in an individual for an individual to be included in the analysis. 319 All individuals with fewer percent of unmasked SNPs than this threshold will be excluded. 320 """ 321 return self.__min_percent_snps 322 323 @min_percent_snps.setter 324 def min_percent_snps(self, x: float) -> None: 325 """ 326 Update `min_percent_snps`. 327 """ 328 self.__min_percent_snps = x 329 330 @property 331 def group_snp_frequencies_only(self) -> bool: 332 """ 333 Retrieve `group_snp_frequencies_only`. 334 335 Returns: 336 **bool:** 337 If True, mdPCA is performed exclusively on group-level SNP frequencies, ignoring individual-level data. This applies 338 when `is_weighted` is set to True and a `combination` column is provided in the `labels_file`, meaning individuals are 339 aggregated into groups based on their assigned labels. If False, mdPCA is performed on individual-level SNP data alone 340 or on both individual-level and group-level SNP frequencies when `is_weighted` is True and a `combination` column is provided. 341 """ 342 return self.__group_snp_frequencies_only 343 344 @group_snp_frequencies_only.setter 345 def group_snp_frequencies_only(self, x: bool) -> None: 346 """ 347 Update `group_snp_frequencies_only`. 348 """ 349 self.__group_snp_frequencies_only = x 350 351 @property 352 def save_masks(self) -> bool: 353 """ 354 Retrieve `save_masks`. 355 356 Returns: 357 **bool:** True if the masked matrices are to be saved in a `.npz` file, or False otherwise. 358 """ 359 return self.__save_masks 360 361 @save_masks.setter 362 def save_masks(self, x: bool) -> None: 363 """ 364 Update `save_masks`. 365 """ 366 self.__save_masks = x 367 368 @property 369 def load_masks(self) -> bool: 370 """ 371 Retrieve `load_masks`. 372 373 Returns: 374 **bool:** 375 True if the masked matrices are to be loaded from a pre-existing `.npz` file specified 376 by `masks_file`, or False otherwise. 377 """ 378 return self.__load_masks 379 380 @load_masks.setter 381 def load_masks(self, x: bool) -> None: 382 """ 383 Update `load_masks`. 384 """ 385 self.__load_masks = x 386 387 @property 388 def masks_file(self) -> Union[str, pathlib.Path]: 389 """ 390 Retrieve `masks_file`. 391 392 Returns: 393 **str or pathlib.Path:** Path to the `.npz` file used for saving/loading masked matrices. 394 """ 395 return self.__masks_file 396 397 @masks_file.setter 398 def masks_file(self, x: Union[str, pathlib.Path]) -> None: 399 """ 400 Update `masks_file`. 401 """ 402 self.__masks_file = x 403 404 @property 405 def distance_type(self) -> str: 406 """ 407 Retrieve `distance_type`. 408 409 Returns: 410 **str:** 411 Distance metric to use. Options to choose from are: 'Manhattan', 'RMS' (Root Mean Square), 'AP' (Average Pairwise). 412 If `average_strands=True`, use 'distance_type=AP'. 413 """ 414 return self.__distance_type 415 416 @distance_type.setter 417 def distance_type(self, x: str) -> None: 418 """ 419 Update `distance_type`. 420 """ 421 self.__distance_type = x 422 423 @property 424 def n_components(self) -> int: 425 """ 426 Retrieve `n_components`. 427 428 Returns: 429 **int:** The number of principal components. 430 """ 431 return self.__n_components 432 433 @n_components.setter 434 def n_components(self, x: int) -> None: 435 """ 436 Update `n_components`. 437 """ 438 self.__n_components = x 439 440 @property 441 def rsid_or_chrompos(self) -> int: 442 """ 443 Retrieve `rsid_or_chrompos`. 444 445 Returns: 446 **int:** Format indicator for SNP IDs in the SNP data. Use 1 for `rsID` format or 2 for `chromosome_position`. 447 """ 448 return self.__rsid_or_chrompos 449 450 @rsid_or_chrompos.setter 451 def rsid_or_chrompos(self, x: int) -> None: 452 """ 453 Update `rsid_or_chrompos`. 454 """ 455 self.__rsid_or_chrompos = x 456 457 @property 458 def X_new_(self) -> Optional[np.ndarray]: 459 """ 460 Retrieve `X_new_`. 461 462 Returns: 463 **array of shape (n_haplotypes_, n_components):** 464 The transformed SNP data projected onto the `n_components` principal components. 465 n_haplotypes_ is the number of haplotypes, potentially reduced if filtering is applied 466 (`min_percent_snps > 0`). For diploid individuals without filtering, the shape is 467 `(n_samples * 2, n_components)`. 468 """ 469 return self.__X_new_ 470 471 @X_new_.setter 472 def X_new_(self, x: np.ndarray) -> None: 473 """ 474 Update `X_new_`. 475 """ 476 self.__X_new_ = x 477 478 @property 479 def haplotypes_(self) -> Optional[List[str]]: 480 """ 481 Retrieve `haplotypes_`. 482 483 Returns: 484 list of str: 485 A list of unique haplotype identifiers. 486 """ 487 if isinstance(self.__haplotypes_, np.ndarray): 488 return self.__haplotypes_.ravel().tolist() # Flatten and convert NumPy array to a list 489 elif isinstance(self.__haplotypes_, list): 490 if len(self.__haplotypes_) == 1 and isinstance(self.__haplotypes_[0], np.ndarray): 491 return self.__haplotypes_[0].ravel().tolist() # Handle list containing a single array 492 return self.__haplotypes_ # Already a flat list 493 elif self.__haplotypes_ is None: 494 return None # If no haplotypes are set 495 else: 496 raise TypeError("`haplotypes_` must be a list or a NumPy array.") 497 498 @haplotypes_.setter 499 def haplotypes_(self, x: Union[np.ndarray, List[str]]) -> None: 500 """ 501 Update `haplotypes_`. 502 """ 503 if isinstance(x, np.ndarray): 504 self.__haplotypes_ = x.ravel().tolist() # Flatten and convert to a list 505 elif isinstance(x, list): 506 if len(x) == 1 and isinstance(x[0], np.ndarray): # Handle list containing a single array 507 self.__haplotypes_ = x[0].ravel().tolist() 508 else: 509 self.__haplotypes_ = x # Use directly if already a list 510 else: 511 raise TypeError("`x` must be a list or a NumPy array.") 512 513 @property 514 def samples_(self) -> Optional[List[str]]: 515 """ 516 Retrieve `samples_`. 517 518 Returns: 519 list of str: 520 A list of sample identifiers based on `haplotypes_` and `average_strands`. 521 """ 522 haplotypes = self.haplotypes_ 523 if haplotypes is None: 524 return None 525 if self.__average_strands: 526 return haplotypes 527 else: 528 return [x[:-2] for x in haplotypes] 529 530 @property 531 def variants_id_(self) -> Optional[np.ndarray]: 532 """ 533 Retrieve `variants_id_`. 534 535 Returns: 536 **array of shape (n_snp,):** 537 An array containing unique identifiers (IDs) for each SNP, 538 potentially reduced if there are SNPs not present in the `laiboj`. 539 The format will depend on `rsid_or_chrompos`. 540 """ 541 return self.__variants_id_ 542 543 @variants_id_.setter 544 def variants_id_(self, x: np.ndarray) -> None: 545 """ 546 Update `variants_id_`. 547 """ 548 self.__variants_id_ = x 549 550 @property 551 def n_haplotypes(self) -> Optional[int]: 552 """ 553 Retrieve `n_haplotypes`. 554 555 Returns: 556 **int:** 557 The total number of haplotypes, potentially reduced if filtering is applied 558 (`min_percent_snps > 0`). 559 """ 560 return len(self.__haplotypes_) 561 562 @property 563 def n_samples(self) -> Optional[int]: 564 """ 565 Retrieve `n_samples`. 566 567 Returns: 568 **int:** 569 The total number of samples, potentially reduced if filtering is applied 570 (`min_percent_snps > 0`). 571 """ 572 return len(np.unique(self.samples_)) 573 574 @staticmethod 575 def _define_ancestry(ancestry, ancestry_map): 576 """ 577 Determine the ancestry index based on different input types. 578 579 Args: 580 ancestry (int or str): The ancestry input, which can be: 581 - An integer (e.g., 0, 1, 2). 582 - A string representation of an integer (e.g., '0', '1'). 583 - A string matching one of the ancestry map values (e.g., 'Africa'). 584 ancestry_map (dict): A dictionary mapping ancestry indices (as strings) to ancestry names. 585 586 Returns: 587 int: The corresponding ancestry index. 588 """ 589 if isinstance(ancestry, int): 590 return ancestry 591 elif isinstance(ancestry, str) and ancestry.isdigit(): 592 return int(ancestry) 593 elif ancestry in ancestry_map.values(): 594 return int(next(key for key, value in ancestry_map.items() if value == ancestry)) 595 else: 596 raise ValueError(f"Invalid ancestry input: {ancestry}") 597 598 @staticmethod 599 def _load_masks_file(masks_file): 600 mask_files = np.load(masks_file, allow_pickle=True) 601 mask = mask_files['mask'] 602 rs_ID_list = mask_files['rs_ID_list'] 603 ind_ID_list = mask_files['ind_ID_list'] 604 groups = mask_files['labels'] 605 weights = mask_files['weights'] 606 return mask, rs_ID_list, ind_ID_list, groups, weights 607 608 def fit_transform( 609 self, 610 snpobj: Optional['SNPObject'] = None, 611 laiobj: Optional['LocalAncestryObject'] = None, 612 labels_file: Optional[str] = None, 613 ancestry: Optional[str] = None, 614 average_strands: Optional[bool] = None 615 ) -> np.ndarray: 616 """ 617 Fit the model to the SNP data stored in the provided `snpobj` and apply the dimensionality reduction on the same SNP data. 618 619 Args: 620 snpobj (SNPObject, optional): 621 A SNPObject instance. 622 laiobj (LAIObject, optional): 623 A LAIObject instance. 624 labels_file (str, optional): 625 Path to the labels file in .tsv format. The first column, `indID`, contains the individual identifiers, and the second 626 column, `label`, specifies the groups for all individuals. If `is_weighted=True`, a `weight` column with individual 627 weights is required. Optionally, `combination` and `combination_weight` columns can specify sets of individuals to be 628 combined into groups, with respective weights. 629 ancestry (str, optional): 630 Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0. 631 average_strands (bool, optional): 632 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 633 If None, defaults to `self.average_strands`. 634 635 Returns: 636 **array of shape (n_samples, n_components):** 637 The transformed SNP data projected onto the `n_components` principal components, stored in `self.X_new_`. 638 """ 639 if snpobj is None: 640 snpobj = self.snpobj 641 if laiobj is None: 642 laiobj = self.laiobj 643 if labels_file is None: 644 labels_file = self.labels_file 645 if ancestry is None: 646 ancestry = self.ancestry 647 if average_strands is None: 648 average_strands = self.average_strands 649 650 if not self.is_masked: 651 self.ancestry = 1 652 if self.load_masks: 653 # Load precomputed ancestry-based masked genotype matrix, SNP identifiers, haplotype identifiers, and weights 654 mask, variants_id, haplotypes, _, weights = self._load_masks_file(self.masks_file) 655 else: 656 # Process genotype data with optional ancestry-based masking and return the corresponding SNP and individual identifiers 657 mask, variants_id, haplotypes = process_calldata_gt( 658 self.snpobj, 659 self.laiobj, 660 self.ancestry, 661 self.average_strands, 662 self.force_nan_incomplete_strands, 663 self.is_masked, 664 self.rsid_or_chrompos 665 ) 666 667 # Process individual genomic labels and weights, aligning them with a masked genotype matrix by 668 # filtering out low-coverage individuals, reordering data to match the matrix structure, and 669 # handling group-based adjustments 670 mask, haplotypes, groups, weights = process_labels_weights( 671 self.labels_file, 672 mask, 673 variants_id, 674 haplotypes, 675 self.average_strands, 676 self.ancestry, 677 self.min_percent_snps, 678 self.group_snp_frequencies_only, 679 self.groups_to_remove, 680 self.is_weighted, 681 self.save_masks, 682 self.masks_file 683 ) 684 685 distance_list = [[distance_mat(first=mask[self.ancestry], dist_func=self.distance_type)]] 686 687 self.X_new_ = mds_transform(distance_list, groups, weights, haplotypes, self.n_components) 688 689 self.haplotypes_ = haplotypes 690 self.variants_id_ = variants_id
A class for performing multiple array ancestry-specific multidimensional scaling (maasMDS) on SNP data.
The maasMDS class focuses on genotype segments from the ancestry of interest when the is_masked
flag is set to True
. It offers
flexible processing options, allowing either separate handling of masked haplotype strands or combining (averaging) strands into a
single composite representation for each individual. Moreover, the analysis can be performed on individual-level data, group-level SNP
frequencies, or a combination of both.
This class supports both separate and averaged strand processing for SNP data. If the snpobj
,
laiobj
, labels_file
, and ancestry
parameters are all provided during instantiation,
the fit_transform
method will be automatically called, applying the specified maasMDS method to transform
the data upon instantiation.
27 def __init__( 28 self, 29 snpobj: Optional['SNPObject'] = None, 30 laiobj: Optional['LocalAncestryObject'] = None, 31 labels_file: Optional[str] = None, 32 ancestry: Optional[Union[int, str]] = None, 33 is_masked: bool = True, 34 average_strands: bool = False, 35 force_nan_incomplete_strands: bool = False, 36 is_weighted: bool = False, 37 groups_to_remove: Dict[int, List[str]] = {}, 38 min_percent_snps: float = 4, 39 group_snp_frequencies_only: bool = True, 40 save_masks: bool = False, 41 load_masks: bool = False, 42 masks_file: Union[str, pathlib.Path] = 'masks.npz', 43 distance_type: str = 'AP', 44 n_components: int = 2, 45 rsid_or_chrompos: int = 2 46 ): 47 """ 48 Args: 49 snpobj (SNPObject, optional): 50 A SNPObject instance. 51 laiobj (LAIObject, optional): 52 A LAIObject instance. 53 labels_file (str, optional): 54 Path to the labels file in .tsv format. The first column, `indID`, contains the individual identifiers, and the second 55 column, `label`, specifies the groups for all individuals. If `is_weighted=True`, a `weight` column with individual 56 weights is required. Optionally, `combination` and `combination_weight` columns can specify sets of individuals to be 57 combined into groups, with respective weights. 58 ancestry (int or str, optional): 59 Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at `0`. The ancestry input can be: 60 - An integer (e.g., 0, 1, 2). 61 - A string representation of an integer (e.g., '0', '1'). 62 - A string matching one of the ancestry map values (e.g., 'Africa'). 63 is_masked (bool, default=True): 64 If `True`, applies ancestry-specific masking to the genotype matrix, retaining only genotype data 65 corresponding to the specified `ancestry`. If `False`, uses the full, unmasked genotype matrix. 66 average_strands (bool, default=False): 67 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 68 force_nan_incomplete_strands (bool): 69 If `True`, sets the result to NaN if either haplotype in a pair is NaN. 70 Otherwise, computes the mean while ignoring NaNs (e.g., 0|NaN -> 0, 1|NaN -> 1). 71 is_weighted (bool, default=False): 72 True if weights are provided in the labels file, or False otherwise. 73 groups_to_remove (dict of int to list of str, default={}): 74 Dictionary specifying groups to exclude from analysis. Keys are array numbers, and values are 75 lists of groups to remove for each array. 76 Example: `{1: ['group1', 'group2'], 2: [], 3: ['group3']}`. 77 min_percent_snps (float, default=4.0): 78 Minimum percentage of SNPs to be known in an individual for an individual to be included in the analysis. 79 All individuals with fewer percent of unmasked SNPs than this threshold will be excluded. 80 group_snp_frequencies_only (bool, default=True): 81 If True, mdPCA is performed exclusively on group-level SNP frequencies, ignoring individual-level data. This applies when `is_weighted` is 82 set to True and a `combination` column is provided in the `labels_file`, meaning individuals are aggregated into groups based on their assigned 83 labels. If False, mdPCA is performed on individual-level SNP data alone or on both individual-level and group-level SNP frequencies when 84 `is_weighted` is True and a `combination` column is provided. 85 save_masks (bool, default=False): 86 True if the masked matrices are to be saved in a `.npz` file, or False otherwise. 87 load_masks (bool, default=False): 88 True if the masked matrices are to be loaded from a pre-existing `.npz` file specified by `masks_file`, or False otherwise. 89 masks_file (str or pathlib.Path, default='masks.npz'): 90 Path to the `.npz` file used for saving/loading masked matrices. 91 distance_type (str, default='AP'): 92 Distance metric to use. Options to choose from are: 'Manhattan', 'RMS' (Root Mean Square), 'AP' (Average Pairwise). 93 If `average_strands=True`, use 'distance_type=AP'. 94 n_components (int, default=2): 95 The number of principal components. 96 rsid_or_chrompos (int, default=2): 97 Format indicator for SNP IDs in the SNP data. Use 1 for `rsID` format or 2 for `chromosome_position`. 98 """ 99 self.__snpobj = snpobj 100 self.__laiobj = laiobj 101 self.__labels_file = labels_file 102 self.__ancestry = self._define_ancestry(ancestry, laiobj.ancestry_map) 103 self.__is_masked = is_masked 104 self.__average_strands = average_strands 105 self.__force_nan_incomplete_strands = force_nan_incomplete_strands 106 self.__groups_to_remove = groups_to_remove 107 self.__min_percent_snps = min_percent_snps 108 self.__group_snp_frequencies_only = group_snp_frequencies_only 109 self.__is_weighted = is_weighted 110 self.__save_masks = save_masks 111 self.__load_masks = load_masks 112 self.__masks_file = masks_file 113 self.__distance_type = distance_type 114 self.__n_components = n_components 115 self.__rsid_or_chrompos = rsid_or_chrompos 116 self.__X_new_ = None # Store transformed SNP data 117 self.__haplotypes_ = None # Store haplotypes after filtering if min_percent_snps > 0 118 self.__samples_ = None # Store samples after filtering if min_percent_snps > 0 119 self.__variants_id_ = None # Store variants ID (after filtering SNPs not in laiobj) 120 121 # Fit and transform if a `snpobj`, `laiobj`, `labels_file`, and `ancestry` are provided 122 if self.snpobj is not None and self.laiobj is not None and self.labels_file is not None and self.ancestry is not None: 123 self.fit_transform(snpobj, laiobj, labels_file, ancestry)
Arguments:
- snpobj (SNPObject, optional): A SNPObject instance.
- laiobj (LAIObject, optional): A LAIObject instance.
- labels_file (str, optional): Path to the labels file in .tsv format. The first column,
indID
, contains the individual identifiers, and the second column,label
, specifies the groups for all individuals. Ifis_weighted=True
, aweight
column with individual weights is required. Optionally,combination
andcombination_weight
columns can specify sets of individuals to be combined into groups, with respective weights. - ancestry (int or str, optional): Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at
0
. The ancestry input can be:- An integer (e.g., 0, 1, 2).
- A string representation of an integer (e.g., '0', '1').
- A string matching one of the ancestry map values (e.g., 'Africa').
- is_masked (bool, default=True): If
True
, applies ancestry-specific masking to the genotype matrix, retaining only genotype data corresponding to the specifiedancestry
. IfFalse
, uses the full, unmasked genotype matrix. - average_strands (bool, default=False): True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
- force_nan_incomplete_strands (bool): If
True
, sets the result to NaN if either haplotype in a pair is NaN. Otherwise, computes the mean while ignoring NaNs (e.g., 0|NaN -> 0, 1|NaN -> 1). - is_weighted (bool, default=False): True if weights are provided in the labels file, or False otherwise.
- groups_to_remove (dict of int to list of str, default={}): Dictionary specifying groups to exclude from analysis. Keys are array numbers, and values are
lists of groups to remove for each array.
Example:
{1: ['group1', 'group2'], 2: [], 3: ['group3']}
. - min_percent_snps (float, default=4.0): Minimum percentage of SNPs to be known in an individual for an individual to be included in the analysis. All individuals with fewer percent of unmasked SNPs than this threshold will be excluded.
- group_snp_frequencies_only (bool, default=True): If True, mdPCA is performed exclusively on group-level SNP frequencies, ignoring individual-level data. This applies when
is_weighted
is set to True and acombination
column is provided in thelabels_file
, meaning individuals are aggregated into groups based on their assigned labels. If False, mdPCA is performed on individual-level SNP data alone or on both individual-level and group-level SNP frequencies whenis_weighted
is True and acombination
column is provided. - save_masks (bool, default=False): True if the masked matrices are to be saved in a
.npz
file, or False otherwise. - load_masks (bool, default=False): True if the masked matrices are to be loaded from a pre-existing
.npz
file specified bymasks_file
, or False otherwise. - masks_file (str or pathlib.Path, default='masks.npz'): Path to the
.npz
file used for saving/loading masked matrices. - distance_type (str, default='AP'): Distance metric to use. Options to choose from are: 'Manhattan', 'RMS' (Root Mean Square), 'AP' (Average Pairwise).
If
average_strands=True
, use 'distance_type=AP'. - n_components (int, default=2): The number of principal components.
- rsid_or_chrompos (int, default=2): Format indicator for SNP IDs in the SNP data. Use 1 for
rsID
format or 2 forchromosome_position
.
145 def copy(self) -> 'maasMDS': 146 """ 147 Create and return a copy of `self`. 148 149 Returns: 150 **maasMDS:** 151 A new instance of the current object. 152 """ 153 return copy.copy(self)
Create and return a copy of self
.
Returns:
maasMDS: A new instance of the current object.
207 @property 208 def ancestry(self) -> Optional[str]: 209 """ 210 Retrieve `ancestry`. 211 212 Returns: 213 **str:** Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at `0`. 214 """ 215 return self.__ancestry
Retrieve ancestry
.
Returns:
str: Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at
0
.
224 @property 225 def is_masked(self) -> bool: 226 """ 227 Retrieve `is_masked`. 228 229 Returns: 230 **bool:** True if an ancestry file is passed for ancestry-specific masking, or False otherwise. 231 """ 232 return self.__is_masked
Retrieve is_masked
.
Returns:
bool: True if an ancestry file is passed for ancestry-specific masking, or False otherwise.
241 @property 242 def average_strands(self) -> bool: 243 """ 244 Retrieve `average_strands`. 245 246 Returns: 247 **bool:** True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 248 """ 249 return self.__average_strands
Retrieve average_strands
.
Returns:
bool: True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
258 @property 259 def force_nan_incomplete_strands(self) -> bool: 260 """ 261 Retrieve `force_nan_incomplete_strands`. 262 263 Returns: 264 **bool**: If `True`, sets the result to NaN if either haplotype in a pair is NaN. 265 Otherwise, computes the mean while ignoring NaNs (e.g., 0|NaN -> 0, 1|NaN -> 1). 266 """ 267 return self.__force_nan_incomplete_strands
Retrieve force_nan_incomplete_strands
.
Returns:
bool: If
True
, sets the result to NaN if either haplotype in a pair is NaN. Otherwise, computes the mean while ignoring NaNs (e.g., 0|NaN -> 0, 1|NaN -> 1).
276 @property 277 def is_weighted(self) -> bool: 278 """ 279 Retrieve `is_weighted`. 280 281 Returns: 282 **bool:** True if weights are provided in the labels file, or False otherwise. 283 """ 284 return self.__is_weighted
Retrieve is_weighted
.
Returns:
bool: True if weights are provided in the labels file, or False otherwise.
293 @property 294 def groups_to_remove(self) -> Dict[int, List[str]]: 295 """ 296 Retrieve `groups_to_remove`. 297 298 Returns: 299 **dict of int to list of str:** Dictionary specifying groups to exclude from analysis. Keys are array numbers, and values are 300 lists of groups to remove for each array. Example: `{1: ['group1', 'group2'], 2: [], 3: ['group3']}`. 301 """ 302 return self.__groups_to_remove
Retrieve groups_to_remove
.
Returns:
dict of int to list of str: Dictionary specifying groups to exclude from analysis. Keys are array numbers, and values are lists of groups to remove for each array. Example:
{1: ['group1', 'group2'], 2: [], 3: ['group3']}
.
311 @property 312 def min_percent_snps(self) -> float: 313 """ 314 Retrieve `min_percent_snps`. 315 316 Returns: 317 **float:** 318 Minimum percentage of SNPs to be known in an individual for an individual to be included in the analysis. 319 All individuals with fewer percent of unmasked SNPs than this threshold will be excluded. 320 """ 321 return self.__min_percent_snps
Retrieve min_percent_snps
.
Returns:
float: Minimum percentage of SNPs to be known in an individual for an individual to be included in the analysis. All individuals with fewer percent of unmasked SNPs than this threshold will be excluded.
330 @property 331 def group_snp_frequencies_only(self) -> bool: 332 """ 333 Retrieve `group_snp_frequencies_only`. 334 335 Returns: 336 **bool:** 337 If True, mdPCA is performed exclusively on group-level SNP frequencies, ignoring individual-level data. This applies 338 when `is_weighted` is set to True and a `combination` column is provided in the `labels_file`, meaning individuals are 339 aggregated into groups based on their assigned labels. If False, mdPCA is performed on individual-level SNP data alone 340 or on both individual-level and group-level SNP frequencies when `is_weighted` is True and a `combination` column is provided. 341 """ 342 return self.__group_snp_frequencies_only
Retrieve group_snp_frequencies_only
.
Returns:
bool: If True, mdPCA is performed exclusively on group-level SNP frequencies, ignoring individual-level data. This applies when
is_weighted
is set to True and acombination
column is provided in thelabels_file
, meaning individuals are aggregated into groups based on their assigned labels. If False, mdPCA is performed on individual-level SNP data alone or on both individual-level and group-level SNP frequencies whenis_weighted
is True and acombination
column is provided.
351 @property 352 def save_masks(self) -> bool: 353 """ 354 Retrieve `save_masks`. 355 356 Returns: 357 **bool:** True if the masked matrices are to be saved in a `.npz` file, or False otherwise. 358 """ 359 return self.__save_masks
Retrieve save_masks
.
Returns:
bool: True if the masked matrices are to be saved in a
.npz
file, or False otherwise.
368 @property 369 def load_masks(self) -> bool: 370 """ 371 Retrieve `load_masks`. 372 373 Returns: 374 **bool:** 375 True if the masked matrices are to be loaded from a pre-existing `.npz` file specified 376 by `masks_file`, or False otherwise. 377 """ 378 return self.__load_masks
Retrieve load_masks
.
Returns:
bool: True if the masked matrices are to be loaded from a pre-existing
.npz
file specified bymasks_file
, or False otherwise.
387 @property 388 def masks_file(self) -> Union[str, pathlib.Path]: 389 """ 390 Retrieve `masks_file`. 391 392 Returns: 393 **str or pathlib.Path:** Path to the `.npz` file used for saving/loading masked matrices. 394 """ 395 return self.__masks_file
Retrieve masks_file
.
Returns:
str or pathlib.Path: Path to the
.npz
file used for saving/loading masked matrices.
404 @property 405 def distance_type(self) -> str: 406 """ 407 Retrieve `distance_type`. 408 409 Returns: 410 **str:** 411 Distance metric to use. Options to choose from are: 'Manhattan', 'RMS' (Root Mean Square), 'AP' (Average Pairwise). 412 If `average_strands=True`, use 'distance_type=AP'. 413 """ 414 return self.__distance_type
Retrieve distance_type
.
Returns:
str: Distance metric to use. Options to choose from are: 'Manhattan', 'RMS' (Root Mean Square), 'AP' (Average Pairwise). If
average_strands=True
, use 'distance_type=AP'.
440 @property 441 def rsid_or_chrompos(self) -> int: 442 """ 443 Retrieve `rsid_or_chrompos`. 444 445 Returns: 446 **int:** Format indicator for SNP IDs in the SNP data. Use 1 for `rsID` format or 2 for `chromosome_position`. 447 """ 448 return self.__rsid_or_chrompos
Retrieve rsid_or_chrompos
.
Returns:
int: Format indicator for SNP IDs in the SNP data. Use 1 for
rsID
format or 2 forchromosome_position
.
457 @property 458 def X_new_(self) -> Optional[np.ndarray]: 459 """ 460 Retrieve `X_new_`. 461 462 Returns: 463 **array of shape (n_haplotypes_, n_components):** 464 The transformed SNP data projected onto the `n_components` principal components. 465 n_haplotypes_ is the number of haplotypes, potentially reduced if filtering is applied 466 (`min_percent_snps > 0`). For diploid individuals without filtering, the shape is 467 `(n_samples * 2, n_components)`. 468 """ 469 return self.__X_new_
Retrieve X_new_
.
Returns:
array of shape (n_haplotypes_, n_components): The transformed SNP data projected onto the
n_components
principal components. n_haplotypes_ is the number of haplotypes, potentially reduced if filtering is applied (min_percent_snps > 0
). For diploid individuals without filtering, the shape is(n_samples * 2, n_components)
.
478 @property 479 def haplotypes_(self) -> Optional[List[str]]: 480 """ 481 Retrieve `haplotypes_`. 482 483 Returns: 484 list of str: 485 A list of unique haplotype identifiers. 486 """ 487 if isinstance(self.__haplotypes_, np.ndarray): 488 return self.__haplotypes_.ravel().tolist() # Flatten and convert NumPy array to a list 489 elif isinstance(self.__haplotypes_, list): 490 if len(self.__haplotypes_) == 1 and isinstance(self.__haplotypes_[0], np.ndarray): 491 return self.__haplotypes_[0].ravel().tolist() # Handle list containing a single array 492 return self.__haplotypes_ # Already a flat list 493 elif self.__haplotypes_ is None: 494 return None # If no haplotypes are set 495 else: 496 raise TypeError("`haplotypes_` must be a list or a NumPy array.")
513 @property 514 def samples_(self) -> Optional[List[str]]: 515 """ 516 Retrieve `samples_`. 517 518 Returns: 519 list of str: 520 A list of sample identifiers based on `haplotypes_` and `average_strands`. 521 """ 522 haplotypes = self.haplotypes_ 523 if haplotypes is None: 524 return None 525 if self.__average_strands: 526 return haplotypes 527 else: 528 return [x[:-2] for x in haplotypes]
Retrieve samples_
.
Returns:
list of str: A list of sample identifiers based on
haplotypes_
andaverage_strands
.
530 @property 531 def variants_id_(self) -> Optional[np.ndarray]: 532 """ 533 Retrieve `variants_id_`. 534 535 Returns: 536 **array of shape (n_snp,):** 537 An array containing unique identifiers (IDs) for each SNP, 538 potentially reduced if there are SNPs not present in the `laiboj`. 539 The format will depend on `rsid_or_chrompos`. 540 """ 541 return self.__variants_id_
Retrieve variants_id_
.
Returns:
array of shape (n_snp,): An array containing unique identifiers (IDs) for each SNP, potentially reduced if there are SNPs not present in the
laiboj
. The format will depend onrsid_or_chrompos
.
550 @property 551 def n_haplotypes(self) -> Optional[int]: 552 """ 553 Retrieve `n_haplotypes`. 554 555 Returns: 556 **int:** 557 The total number of haplotypes, potentially reduced if filtering is applied 558 (`min_percent_snps > 0`). 559 """ 560 return len(self.__haplotypes_)
Retrieve n_haplotypes
.
Returns:
int: The total number of haplotypes, potentially reduced if filtering is applied (
min_percent_snps > 0
).
562 @property 563 def n_samples(self) -> Optional[int]: 564 """ 565 Retrieve `n_samples`. 566 567 Returns: 568 **int:** 569 The total number of samples, potentially reduced if filtering is applied 570 (`min_percent_snps > 0`). 571 """ 572 return len(np.unique(self.samples_))
Retrieve n_samples
.
Returns:
int: The total number of samples, potentially reduced if filtering is applied (
min_percent_snps > 0
).
608 def fit_transform( 609 self, 610 snpobj: Optional['SNPObject'] = None, 611 laiobj: Optional['LocalAncestryObject'] = None, 612 labels_file: Optional[str] = None, 613 ancestry: Optional[str] = None, 614 average_strands: Optional[bool] = None 615 ) -> np.ndarray: 616 """ 617 Fit the model to the SNP data stored in the provided `snpobj` and apply the dimensionality reduction on the same SNP data. 618 619 Args: 620 snpobj (SNPObject, optional): 621 A SNPObject instance. 622 laiobj (LAIObject, optional): 623 A LAIObject instance. 624 labels_file (str, optional): 625 Path to the labels file in .tsv format. The first column, `indID`, contains the individual identifiers, and the second 626 column, `label`, specifies the groups for all individuals. If `is_weighted=True`, a `weight` column with individual 627 weights is required. Optionally, `combination` and `combination_weight` columns can specify sets of individuals to be 628 combined into groups, with respective weights. 629 ancestry (str, optional): 630 Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0. 631 average_strands (bool, optional): 632 True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise. 633 If None, defaults to `self.average_strands`. 634 635 Returns: 636 **array of shape (n_samples, n_components):** 637 The transformed SNP data projected onto the `n_components` principal components, stored in `self.X_new_`. 638 """ 639 if snpobj is None: 640 snpobj = self.snpobj 641 if laiobj is None: 642 laiobj = self.laiobj 643 if labels_file is None: 644 labels_file = self.labels_file 645 if ancestry is None: 646 ancestry = self.ancestry 647 if average_strands is None: 648 average_strands = self.average_strands 649 650 if not self.is_masked: 651 self.ancestry = 1 652 if self.load_masks: 653 # Load precomputed ancestry-based masked genotype matrix, SNP identifiers, haplotype identifiers, and weights 654 mask, variants_id, haplotypes, _, weights = self._load_masks_file(self.masks_file) 655 else: 656 # Process genotype data with optional ancestry-based masking and return the corresponding SNP and individual identifiers 657 mask, variants_id, haplotypes = process_calldata_gt( 658 self.snpobj, 659 self.laiobj, 660 self.ancestry, 661 self.average_strands, 662 self.force_nan_incomplete_strands, 663 self.is_masked, 664 self.rsid_or_chrompos 665 ) 666 667 # Process individual genomic labels and weights, aligning them with a masked genotype matrix by 668 # filtering out low-coverage individuals, reordering data to match the matrix structure, and 669 # handling group-based adjustments 670 mask, haplotypes, groups, weights = process_labels_weights( 671 self.labels_file, 672 mask, 673 variants_id, 674 haplotypes, 675 self.average_strands, 676 self.ancestry, 677 self.min_percent_snps, 678 self.group_snp_frequencies_only, 679 self.groups_to_remove, 680 self.is_weighted, 681 self.save_masks, 682 self.masks_file 683 ) 684 685 distance_list = [[distance_mat(first=mask[self.ancestry], dist_func=self.distance_type)]] 686 687 self.X_new_ = mds_transform(distance_list, groups, weights, haplotypes, self.n_components) 688 689 self.haplotypes_ = haplotypes 690 self.variants_id_ = variants_id
Fit the model to the SNP data stored in the provided snpobj
and apply the dimensionality reduction on the same SNP data.
Arguments:
- snpobj (SNPObject, optional): A SNPObject instance.
- laiobj (LAIObject, optional): A LAIObject instance.
- labels_file (str, optional): Path to the labels file in .tsv format. The first column,
indID
, contains the individual identifiers, and the second column,label
, specifies the groups for all individuals. Ifis_weighted=True
, aweight
column with individual weights is required. Optionally,combination
andcombination_weight
columns can specify sets of individuals to be combined into groups, with respective weights. - ancestry (str, optional): Ancestry for which dimensionality reduction is to be performed. Ancestry counter starts at 0.
- average_strands (bool, optional): True if the haplotypes from the two parents are to be combined (averaged) for each individual, or False otherwise.
If None, defaults to
self.average_strands
.
Returns:
array of shape (n_samples, n_components): The transformed SNP data projected onto the
n_components
principal components, stored inself.X_new_
.