snputils.snp.genobj
17class SNPObject: 18 """ 19 A class for Single Nucleotide Polymorphism (SNP) data, with optional support for 20 SNP-level Local Ancestry Information (LAI). 21 """ 22 def __init__( 23 self, 24 calldata_gt: Optional[np.ndarray] = None, 25 samples: Optional[np.ndarray] = None, 26 variants_ref: Optional[np.ndarray] = None, 27 variants_alt: Optional[np.ndarray] = None, 28 variants_chrom: Optional[np.ndarray] = None, 29 variants_filter_pass: Optional[np.ndarray] = None, 30 variants_id: Optional[np.ndarray] = None, 31 variants_pos: Optional[np.ndarray] = None, 32 variants_qual: Optional[np.ndarray] = None, 33 calldata_lai: Optional[np.ndarray] = None, 34 ancestry_map: Optional[Dict[str, str]] = None 35 ) -> None: 36 """ 37 Args: 38 calldata_gt (array, optional): 39 An array containing genotype data for each sample. This array can be either 2D with shape 40 `(n_snps, n_samples)` if the paternal and maternal strands are averaged, or 3D with shape 41 `(n_snps, n_samples, 2)` if the strands are kept separate. 42 samples (array of shape (n_sampels,), optional): 43 An array containing unique sample identifiers. 44 variants_ref (array of shape (n_snps,), optional): 45 An array containing the reference allele for each SNP. 46 variants_alt (array of shape (n_snps,), optional): 47 An array containing the alternate allele for each SNP. 48 variants_chrom (array of shape (n_snps,), optional): 49 An array containing the chromosome for each SNP. 50 variants_filter_pass (array of shape (n_snps,), optional): 51 An array indicating whether each SNP passed control checks. 52 variants_id (array of shape (n_snps,), optional): 53 An array containing unique identifiers (IDs) for each SNP. 54 variants_pos (array of shape (n_snps,), optional): 55 An array containing the chromosomal positions for each SNP. 56 variants_qual (array of shape (n_snps,), optional): 57 An array containing the Phred-scaled quality score for each SNP. 58 calldata_lai (array, optional): 59 An array containing the ancestry for each SNP. This array can be either 2D with shape 60 `(n_snps, n_samples*2)`, or 3D with shape (n_snps, n_samples, 2). 61 ancestry_map (dict of str to str, optional): 62 A dictionary mapping ancestry codes to region names. 63 """ 64 self.__calldata_gt = calldata_gt 65 self.__samples = samples 66 self.__variants_ref = variants_ref 67 self.__variants_alt = variants_alt 68 self.__variants_chrom = variants_chrom 69 self.__variants_filter_pass = variants_filter_pass 70 self.__variants_id = variants_id 71 self.__variants_pos = variants_pos 72 self.__variants_qual = variants_qual 73 self.__calldata_lai = calldata_lai 74 self.__ancestry_map = ancestry_map 75 76 self._sanity_check() 77 78 def __getitem__(self, key: str) -> Any: 79 """ 80 To access an attribute of the class using the square bracket notation, 81 similar to a dictionary. 82 """ 83 try: 84 return getattr(self, key) 85 except: 86 raise KeyError(f'Invalid key: {key}.') 87 88 def __setitem__(self, key: str, value: Any): 89 """ 90 To set an attribute of the class using the square bracket notation, 91 similar to a dictionary. 92 """ 93 try: 94 setattr(self, key, value) 95 except: 96 raise KeyError(f'Invalid key: {key}.') 97 98 @property 99 def calldata_gt(self) -> np.ndarray: 100 """ 101 Retrieve `calldata_gt`. 102 103 Returns: 104 **array:** 105 An array containing genotype data for each sample. This array can be either 2D with shape 106 `(n_snps, n_samples)` if the paternal and maternal strands are averaged, or 3D with shape 107 `(n_snps, n_samples, 2)` if the strands are kept separate. 108 """ 109 return self.__calldata_gt 110 111 @calldata_gt.setter 112 def calldata_gt(self, x: np.ndarray): 113 """ 114 Update `calldata_gt`. 115 """ 116 self.__calldata_gt = x 117 118 @property 119 def samples(self) -> Optional[np.ndarray]: 120 """ 121 Retrieve `samples`. 122 123 Returns: 124 **array of shape (n_sampels,):** 125 An array containing unique sample identifiers. 126 """ 127 return self.__samples 128 129 @samples.setter 130 def samples(self, x: Union[List, np.ndarray]): 131 """ 132 Update `samples`. 133 """ 134 self.__samples = np.asarray(x) 135 136 @property 137 def variants_ref(self) -> Optional[np.ndarray]: 138 """ 139 Retrieve `variants_ref`. 140 141 Returns: 142 **array of shape (n_snps,):** An array containing the reference allele for each SNP. 143 """ 144 return self.__variants_ref 145 146 @variants_ref.setter 147 def variants_ref(self, x: np.ndarray): 148 """ 149 Update `variants_ref`. 150 """ 151 self.__variants_ref = x 152 153 @property 154 def variants_alt(self) -> Optional[np.ndarray]: 155 """ 156 Retrieve `variants_alt`. 157 158 Returns: 159 **array of shape (n_snps,):** An array containing the alternate allele for each SNP. 160 """ 161 return self.__variants_alt 162 163 @variants_alt.setter 164 def variants_alt(self, x: np.ndarray): 165 """ 166 Update `variants_alt`. 167 """ 168 self.__variants_alt = x 169 170 @property 171 def variants_chrom(self) -> Optional[np.ndarray]: 172 """ 173 Retrieve `variants_chrom`. 174 175 Returns: 176 **array of shape (n_snps,):** An array containing the chromosome for each SNP. 177 """ 178 return self.__variants_chrom 179 180 @variants_chrom.setter 181 def variants_chrom(self, x: np.ndarray): 182 """ 183 Update `variants_chrom`. 184 """ 185 self.__variants_chrom = x 186 187 @property 188 def variants_filter_pass(self) -> Optional[np.ndarray]: 189 """ 190 Retrieve `variants_filter_pass`. 191 192 Returns: 193 **array of shape (n_snps,):** An array indicating whether each SNP passed control checks. 194 """ 195 return self.__variants_filter_pass 196 197 @variants_filter_pass.setter 198 def variants_filter_pass(self, x: np.ndarray): 199 """ 200 Update `variants_filter_pass`. 201 """ 202 self.__variants_filter_pass = x 203 204 @property 205 def variants_id(self) -> Optional[np.ndarray]: 206 """ 207 Retrieve `variants_id`. 208 209 Returns: 210 **array of shape (n_snps,):** An array containing unique identifiers (IDs) for each SNP. 211 """ 212 return self.__variants_id 213 214 @variants_id.setter 215 def variants_id(self, x: np.ndarray): 216 """ 217 Update `variants_id`. 218 """ 219 self.__variants_id = x 220 221 @property 222 def variants_pos(self) -> Optional[np.ndarray]: 223 """ 224 Retrieve `variants_pos`. 225 226 Returns: 227 **array of shape (n_snps,):** An array containing the chromosomal positions for each SNP. 228 """ 229 return self.__variants_pos 230 231 @variants_pos.setter 232 def variants_pos(self, x: np.ndarray): 233 """ 234 Update `variants_pos`. 235 """ 236 self.__variants_pos = x 237 238 @property 239 def variants_qual(self) -> Optional[np.ndarray]: 240 """ 241 Retrieve `variants_qual`. 242 243 Returns: 244 **array of shape (n_snps,):** An array containing the Phred-scaled quality score for each SNP. 245 """ 246 return self.__variants_qual 247 248 @variants_qual.setter 249 def variants_qual(self, x: np.ndarray): 250 """ 251 Update `variants_qual`. 252 """ 253 self.__variants_qual = x 254 255 @property 256 def calldata_lai(self) -> Optional[np.ndarray]: 257 """ 258 Retrieve `calldata_lai`. 259 260 Returns: 261 **array of shape (n_snps, n_samples, 2):** 262 An array containing the ancestry for each SNP. 263 """ 264 return self.__calldata_lai 265 266 @calldata_lai.setter 267 def calldata_lai(self, x: np.ndarray): 268 """ 269 Update `calldata_lai`. 270 """ 271 self.__calldata_lai = x 272 273 @property 274 def ancestry_map(self) -> Optional[Dict[str, str]]: 275 """ 276 Retrieve `ancestry_map`. 277 278 Returns: 279 **dict of str to str:** A dictionary mapping ancestry codes to region names. 280 """ 281 return self.__ancestry_map 282 283 @ancestry_map.setter 284 def ancestry_map(self, x): 285 """ 286 Update `ancestry_map`. 287 """ 288 self.__ancestry_map = x 289 290 @property 291 def n_samples(self) -> int: 292 """ 293 Retrieve `n_samples`. 294 295 Returns: 296 **int:** The total number of samples. 297 """ 298 if self.__samples is not None: 299 return len(self.__samples) 300 elif self.__calldata_gt is not None: 301 return self.__calldata_gt.shape[1] 302 elif self.__calldata_lai is not None: 303 return self.__calldata_lai.shape[1] 304 else: 305 raise ValueError("Unable to determine the total number of samples: no relevant data is available.") 306 307 @property 308 def n_snps(self) -> int: 309 """ 310 Retrieve `n_snps`. 311 312 Returns: 313 **int:** The total number of SNPs. 314 """ 315 # List of attributes that can indicate the number of SNPs 316 potential_attributes = [ 317 self.__calldata_gt, 318 self.__variants_ref, 319 self.__variants_alt, 320 self.__variants_chrom, 321 self.__variants_filter_pass, 322 self.__variants_id, 323 self.__variants_pos, 324 self.__variants_qual, 325 self.__calldata_lai 326 ] 327 328 # Check each attribute for its first dimension, which corresponds to `n_snps` 329 for attr in potential_attributes: 330 if attr is not None: 331 return attr.shape[0] 332 333 raise ValueError("Unable to determine the total number of SNPs: no relevant data is available.") 334 335 @property 336 def n_chrom(self) -> Optional[int]: 337 """ 338 Retrieve `n_chrom`. 339 340 Returns: 341 **int:** The total number of unique chromosomes in `variants_chrom`. 342 """ 343 if self.variants_chrom is None: 344 warnings.warn("Chromosome data `variants_chrom` is None.") 345 return None 346 347 return len(self.unique_chrom) 348 349 @property 350 def n_ancestries(self) -> int: 351 """ 352 Retrieve `n_ancestries`. 353 354 Returns: 355 **int:** The total number of unique ancestries. 356 """ 357 if self.__calldata_lai is not None: 358 return len(np.unique(self.__calldata_lai)) 359 else: 360 raise ValueError("Unable to determine the total number of ancestries: no relevant data is available.") 361 362 @property 363 def unique_chrom(self) -> Optional[np.ndarray]: 364 """ 365 Retrieve `unique_chrom`. 366 367 Returns: 368 **array:** The unique chromosome names in `variants_chrom`, preserving their order of appearance. 369 """ 370 if self.variants_chrom is None: 371 warnings.warn("Chromosome data `variants_chrom` is None.") 372 return None 373 374 # Identify unique chromosome names and their first indexes of occurrence 375 _, idx = np.unique(self.variants_chrom, return_index=True) 376 # Return chromosome names sorted by their first occurrence to maintain original order 377 return self.variants_chrom[np.sort(idx)] 378 379 @property 380 def are_strands_summed(self) -> bool: 381 """ 382 Retrieve `are_strands_summed`. 383 384 Returns: 385 **bool:** 386 True if the maternal and paternal strands have been summed together, which is indicated by 387 `calldata_gt` having shape `(n_samples, n_snps)`. False if the strands are stored separately, 388 indicated by `calldata_gt` having shape `(n_samples, n_snps, 2)`. 389 """ 390 if self.calldata_gt is None: 391 warnings.warn("Genotype data `calldata_gt` is None.") 392 return None 393 394 return self.calldata_gt.ndim == 2 395 396 def copy(self) -> 'SNPObject': 397 """ 398 Create and return a copy of `self`. 399 400 Returns: 401 **SNPObject:** 402 A new instance of the current object. 403 """ 404 return copy.deepcopy(self) 405 406 def keys(self) -> List[str]: 407 """ 408 Retrieve a list of public attribute names for `self`. 409 410 Returns: 411 **list of str:** 412 A list of attribute names, with internal name-mangling removed, 413 for easier reference to public attributes in the instance. 414 """ 415 return [attr.replace('_SNPObject__', '') for attr in vars(self)] 416 417 def filter_variants( 418 self, 419 chrom: Optional[Union[str, Sequence[str], np.ndarray, None]] = None, 420 pos: Optional[Union[int, Sequence[int], np.ndarray, None]] = None, 421 indexes: Optional[Union[int, Sequence[int], np.ndarray, None]] = None, 422 include: bool = True, 423 inplace: bool = False 424 ) -> Optional['SNPObject']: 425 """ 426 Filter variants based on specified chromosome names, variant positions, or variant indexes. 427 428 This method updates the `calldata_gt`, `variants_ref`, `variants_alt`, 429 `variants_chrom`, `variants_filter_pass`, `variants_id`, `variants_pos`, 430 `variants_qual`, and `lai` attributes to include or exclude the specified variants. The filtering 431 criteria can be based on chromosome names, variant positions, or indexes. If multiple 432 criteria are provided, their union is used for filtering. The order of the variants is preserved. 433 434 Negative indexes are supported and follow 435 [NumPy's indexing conventions](https://numpy.org/doc/stable/user/basics.indexing.html). 436 437 Args: 438 chrom (str or array_like of str, optional): 439 Chromosome(s) to filter variants by. Can be a single chromosome as a string or a sequence 440 of chromosomes. If both `chrom` and `pos` are provided, they must either have matching lengths 441 (pairing each chromosome with a position) or `chrom` should be a single value that applies to 442 all positions in `pos`. Default is None. 443 pos (int or array_like of int, optional): 444 Position(s) to filter variants by. Can be a single position as an integer or a sequence of positions. 445 If `chrom` is also provided, `pos` should either match `chrom` in length or `chrom` should be a 446 single value. Default is None. 447 indexes (int or array_like of int, optional): 448 Index(es) of the variants to include or exclude. Can be a single index or a sequence 449 of indexes. Negative indexes are supported. Default is None. 450 include (bool, default=True): 451 If True, includes only the specified variants. If False, excludes the specified 452 variants. Default is True. 453 inplace (bool, default=False): 454 If True, modifies `self` in place. If False, returns a new `SNPObject` with the variants 455 filtered. Default is False. 456 457 Returns: 458 **Optional[SNPObject]:** 459 A new `SNPObject` with the specified variants filtered if `inplace=False`. 460 If `inplace=True`, modifies `self` in place and returns None. 461 """ 462 if chrom is None and pos is None and indexes is None: 463 raise ValueError("At least one of 'chrom', 'pos', or 'indexes' must be provided.") 464 465 n_snps = self.n_snps 466 467 # Convert inputs to arrays for consistency 468 chrom = np.atleast_1d(chrom) if chrom is not None else None 469 pos = np.atleast_1d(pos) if pos is not None else None 470 indexes = np.atleast_1d(indexes) if indexes is not None else None 471 472 # Validate chrom and pos lengths if both are provided 473 if chrom is not None and pos is not None: 474 if len(chrom) != len(pos) and len(chrom) > 1: 475 raise ValueError( 476 "When both 'chrom' and 'pos' are provided, they must either be of the same length " 477 "or 'chrom' must be a single value." 478 ) 479 480 # Create a mask for chromosome and position filtering 481 mask_combined = np.zeros(n_snps, dtype=bool) 482 if chrom is not None and pos is not None: 483 if len(chrom) == 1: 484 # Apply single chromosome to all positions in `pos` 485 mask_combined = (self['variants_chrom'] == chrom[0]) & np.isin(self['variants_pos'], pos) 486 else: 487 # Vectorized pair matching for chrom and pos 488 query_pairs = np.array( 489 list(zip(chrom, pos)), 490 dtype=[ 491 ('chrom', self['variants_chrom'].dtype), 492 ('pos', self['variants_pos'].dtype) 493 ] 494 ) 495 data_pairs = np.array( 496 list(zip(self['variants_chrom'], self['variants_pos'])), 497 dtype=[ 498 ('chrom', self['variants_chrom'].dtype), 499 ('pos', self['variants_pos'].dtype) 500 ] 501 ) 502 mask_combined = np.isin(data_pairs, query_pairs) 503 504 elif chrom is not None: 505 # Only chromosome filtering 506 mask_combined = np.isin(self['variants_chrom'], chrom) 507 elif pos is not None: 508 # Only position filtering 509 mask_combined = np.isin(self['variants_pos'], pos) 510 511 # Create mask based on indexes if provided 512 if indexes is not None: 513 # Validate indexes, allowing negative indexes 514 out_of_bounds_indexes = indexes[(indexes < -n_snps) | (indexes >= n_snps)] 515 if out_of_bounds_indexes.size > 0: 516 raise ValueError(f"One or more sample indexes are out of bounds.") 517 518 # Handle negative indexes and check for out-of-bounds indexes 519 adjusted_indexes = np.mod(indexes, n_snps) 520 521 # Create mask for specified indexes 522 mask_indexes = np.zeros(n_snps, dtype=bool) 523 mask_indexes[adjusted_indexes] = True 524 525 # Combine with `chrom` and `pos` mask using logical OR (union of all specified criteria) 526 mask_combined = mask_combined | mask_indexes 527 528 # Invert mask if `include` is False 529 if not include: 530 mask_combined = ~mask_combined 531 532 # Define keys to filter 533 keys = [ 534 'calldata_gt', 'variants_ref', 'variants_alt', 'variants_chrom', 'variants_filter_pass', 535 'variants_id', 'variants_pos', 'variants_qual', 'calldata_lai' 536 ] 537 538 # Apply filtering based on inplace parameter 539 if inplace: 540 for key in keys: 541 if self[key] is not None: 542 if self[key].ndim > 1: 543 self[key] = np.asarray(self[key])[mask_combined, ...] 544 else: 545 self[key] = np.asarray(self[key])[mask_combined] 546 547 return None 548 else: 549 # Create A new `SNPObject` with filtered data 550 snpobj = self.copy() 551 for key in keys: 552 if snpobj[key] is not None: 553 if snpobj[key].ndim > 1: 554 snpobj[key] = np.asarray(snpobj[key])[mask_combined, ...] 555 else: 556 snpobj[key] = np.asarray(snpobj[key])[mask_combined] 557 558 return snpobj 559 560 def filter_samples( 561 self, 562 samples: Optional[Union[str, Sequence[str], np.ndarray, None]] = None, 563 indexes: Optional[Union[int, Sequence[int], np.ndarray, None]] = None, 564 include: bool = True, 565 inplace: bool = False 566 ) -> Optional['SNPObject']: 567 """ 568 Filter samples based on specified names or indexes. 569 570 This method updates the `samples` and `calldata_gt` attributes to include or exclude the specified 571 samples. The order of the samples is preserved. 572 573 If both samples and indexes are provided, any sample matching either a name in samples or an index in 574 indexes will be included or excluded. 575 576 This method allows inclusion or exclusion of specific samples by their names or 577 indexes. When both sample names and indexes are provided, the union of the specified samples 578 is used. Negative indexes are supported and follow 579 [NumPy's indexing conventions](https://numpy.org/doc/stable/user/basics.indexing.html). 580 581 Args: 582 samples (str or array_like of str, optional): 583 Name(s) of the samples to include or exclude. Can be a single sample name or a 584 sequence of sample names. Default is None. 585 indexes (int or array_like of int, optional): 586 Index(es) of the samples to include or exclude. Can be a single index or a sequence 587 of indexes. Negative indexes are supported. Default is None. 588 include (bool, default=True): 589 If True, includes only the specified samples. If False, excludes the specified 590 samples. Default is True. 591 inplace (bool, default=False): 592 If True, modifies `self` in place. If False, returns a new `SNPObject` with the samples 593 filtered. Default is False. 594 595 Returns: 596 **Optional[SNPObject]:** 597 A new `SNPObject` with the specified samples filtered if `inplace=False`. 598 If `inplace=True`, modifies `self` in place and returns None. 599 """ 600 if samples is None and indexes is None: 601 raise ValueError("At least one of 'samples' or 'indexes' must be provided.") 602 603 n_samples = self.n_samples 604 sample_names = np.array(self['samples']) 605 606 # Create mask based on sample names 607 if samples is not None: 608 samples = np.atleast_1d(samples) 609 mask_samples = np.isin(sample_names, samples) 610 missing_samples = samples[~np.isin(samples, sample_names)] 611 if missing_samples.size > 0: 612 raise ValueError(f"The following specified samples were not found: {missing_samples.tolist()}") 613 else: 614 mask_samples = np.zeros(n_samples, dtype=bool) 615 616 # Create mask based on sample indexes 617 if indexes is not None: 618 indexes = np.atleast_1d(indexes) 619 620 # Validate indexes, allowing negative indexes 621 out_of_bounds_indexes = indexes[(indexes < -n_samples) | (indexes >= n_samples)] 622 if out_of_bounds_indexes.size > 0: 623 raise ValueError(f"One or more sample indexes are out of bounds.") 624 625 # Handle negative indexes 626 adjusted_indexes = np.mod(indexes, n_samples) 627 628 mask_indexes = np.zeros(n_samples, dtype=bool) 629 mask_indexes[adjusted_indexes] = True 630 else: 631 mask_indexes = np.zeros(n_samples, dtype=bool) 632 633 # Combine masks using logical OR (union of samples) 634 mask_combined = mask_samples | mask_indexes 635 636 if not include: 637 mask_combined = ~mask_combined 638 639 # Define keys to filter 640 keys = ['samples', 'calldata_gt', 'calldata_lai'] 641 642 # Apply filtering based on inplace parameter 643 if inplace: 644 for key in keys: 645 if self[key] is not None: 646 if self[key].ndim > 1: 647 self[key] = np.asarray(self[key])[:, mask_combined, ...] 648 else: 649 self[key] = np.asarray(self[key])[mask_combined] 650 651 return None 652 else: 653 # Create A new `SNPObject` with filtered data 654 snpobj = self.copy() 655 for key in keys: 656 if snpobj[key] is not None: 657 if snpobj[key].ndim > 1: 658 snpobj[key] = np.asarray(snpobj[key])[:, mask_combined, ...] 659 else: 660 snpobj[key] = np.asarray(snpobj[key])[mask_combined] 661 return snpobj 662 663 def detect_chromosome_format(self) -> str: 664 """ 665 Detect the chromosome naming convention in `variants_chrom` based on the prefix 666 of the first chromosome identifier in `unique_chrom`. 667 668 **Recognized formats:** 669 670 - `'chr'`: Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'. 671 - `'chm'`: Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'. 672 - `'chrom'`: Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'. 673 - `'plain'`: Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'. 674 675 If the format does not match any recognized pattern, `'Unknown format'` is returned. 676 677 Returns: 678 **str:** 679 A string indicating the detected chromosome format (`'chr'`, `'chm'`, `'chrom'`, or `'plain'`). 680 If no recognized format is matched, returns `'Unknown format'`. 681 """ 682 # Select the first unique chromosome identifier for format detection 683 chromosome_str = self.unique_chrom[0] 684 685 # Define regular expressions to match each recognized chromosome format 686 patterns = { 687 'chr': r'^chr(\d+|X|Y|M)$', # Matches 'chr' prefixed format 688 'chm': r'^chm(\d+|X|Y|M)$', # Matches 'chm' prefixed format 689 'chrom': r'^chrom(\d+|X|Y|M)$', # Matches 'chrom' prefixed format 690 'plain': r'^(\d+|X|Y|M)$' # Matches plain format without prefix 691 } 692 693 # Iterate through the patterns to identify the chromosome format 694 for prefix, pattern in patterns.items(): 695 if re.match(pattern, chromosome_str): 696 return prefix # Return the recognized format prefix 697 698 # If no pattern matches, return 'Unknown format' 699 return 'Unknown format' 700 701 def convert_chromosome_format( 702 self, 703 from_format: str, 704 to_format: str, 705 inplace: bool = False 706 ) -> Optional['SNPObject']: 707 """ 708 Convert the chromosome format from one naming convention to another in `variants_chrom`. 709 710 **Supported formats:** 711 712 - `'chr'`: Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'. 713 - `'chm'`: Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'. 714 - `'chrom'`: Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'. 715 - `'plain'`: Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'. 716 717 Args: 718 from_format (str): 719 The current chromosome format. Acceptable values are `'chr'`, `'chm'`, `'chrom'`, or `'plain'`. 720 to_format (str): 721 The target format for chromosome data conversion. Acceptable values match `from_format` options. 722 inplace (bool, default=False): 723 If True, modifies `self` in place. If False, returns a new `SNPObject` with the converted format. 724 Default is False. 725 726 Returns: 727 **Optional[SNPObject]:** A new `SNPObject` with the converted chromosome format if `inplace=False`. 728 If `inplace=True`, modifies `self` in place and returns None. 729 """ 730 # Define the list of standard chromosome identifiers 731 chrom_list = [*map(str, range(1, 23)), 'X', 'Y', 'M'] # M for mitochondrial chromosomes 732 733 # Format mappings for different chromosome naming conventions 734 format_mappings = { 735 'chr': [f'chr{i}' for i in chrom_list], 736 'chm': [f'chm{i}' for i in chrom_list], 737 'chrom': [f'chrom{i}' for i in chrom_list], 738 'plain': chrom_list, 739 } 740 741 # Verify that from_format and to_format are valid naming conventions 742 if from_format not in format_mappings or to_format not in format_mappings: 743 raise ValueError(f"Invalid format: {from_format} or {to_format}. Must be one of {list(format_mappings.keys())}.") 744 745 # Convert chromosomes to string for consistent comparison 746 variants_chrom = self['variants_chrom'].astype(str) 747 748 # Verify that all chromosomes in the object follow the specified `from_format` 749 expected_chroms = set(format_mappings[from_format]) 750 mismatched_chroms = set(variants_chrom) - expected_chroms 751 752 if mismatched_chroms: 753 raise ValueError(f"The following chromosomes do not match the `from_format` '{from_format}': {mismatched_chroms}.") 754 755 # Create conditions for selecting based on current `from_format` names 756 conditions = [variants_chrom == chrom for chrom in format_mappings[from_format]] 757 758 # Rename chromosomes based on inplace flag 759 if inplace: 760 self['variants_chrom'] = np.select(conditions, format_mappings[to_format], default='unknown') 761 return None 762 else: 763 snpobject = self.copy() 764 snpobject['variants_chrom'] = np.select(conditions, format_mappings[to_format], default='unknown') 765 return snpobject 766 767 def match_chromosome_format(self, snpobj: 'SNPObject', inplace: bool = False) -> Optional['SNPObject']: 768 """ 769 Convert the chromosome format in `variants_chrom` from `self` to match the format of a reference `snpobj`. 770 771 **Recognized formats:** 772 773 - `'chr'`: Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'. 774 - `'chm'`: Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'. 775 - `'chrom'`: Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'. 776 - `'plain'`: Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'. 777 778 Args: 779 snpobj (SNPObject): 780 The reference SNPObject whose chromosome format will be matched. 781 inplace (bool, default=False): 782 If True, modifies `self` in place. If False, returns a new `SNPObject` with the 783 chromosome format matching that of `snpobj`. Default is False. 784 785 Returns: 786 **Optional[SNPObject]:** 787 A new `SNPObject` with matched chromosome format if `inplace=False`. 788 If `inplace=True`, modifies `self` in place and returns None. 789 """ 790 # Detect the chromosome naming format of the current SNPObject 791 fmt1 = self.detect_chromosome_format() 792 if fmt1 == 'Unknown format': 793 raise ValueError("The chromosome format of the current SNPObject is unrecognized.") 794 795 # Detect the chromosome naming format of the reference SNPObject 796 fmt2 = snpobj.detect_chromosome_format() 797 if fmt2 == 'Unknown format': 798 raise ValueError("The chromosome format of the reference SNPObject is unrecognized.") 799 800 # Convert the current SNPObject's chromosome format to match the reference format 801 return self.convert_chromosome_format(fmt1, fmt2, inplace=inplace) 802 803 def rename_chrom( 804 self, 805 to_replace: Union[Dict[str, str], str, List[str]] = {'^([0-9]+)$': r'chr\1', r'^chr([0-9]+)$': r'\1'}, 806 value: Optional[Union[str, List[str]]] = None, 807 regex: bool = True, 808 inplace: bool = False 809 ) -> Optional['SNPObject']: 810 """ 811 Replace chromosome values in `variants_chrom` using patterns or exact matches. 812 813 This method allows flexible chromosome replacements, using regex or exact matches, useful 814 for non-standard chromosome formats. For standard conversions (e.g., 'chr1' to '1'), 815 consider `convert_chromosome_format`. 816 817 Args: 818 to_replace (dict, str, or list of str): 819 Pattern(s) or exact value(s) to be replaced in chromosome names. Default behavior 820 transforms `<chrom_num>` to `chr<chrom_num>` or vice versa. Non-matching values 821 remain unchanged. 822 - If str or list of str: Matches will be replaced with `value`. 823 - If regex (bool), then any regex matches will be replaced with `value`. 824 - If dict: Keys defines values to replace, with corresponding replacements as values. 825 value (str or list of str, optional): 826 Replacement value(s) if `to_replace` is a string or list. Ignored if `to_replace` 827 is a dictionary. 828 regex (bool, default=True): 829 If True, interprets `to_replace` keys as regex patterns. 830 inplace (bool, default=False): 831 If True, modifies `self` in place. If False, returns a new `SNPObject` with the chromosomes 832 renamed. Default is False. 833 834 Returns: 835 **Optional[SNPObject]:** A new `SNPObject` with the renamed chromosome format if `inplace=False`. 836 If `inplace=True`, modifies `self` in place and returns None. 837 """ 838 # Standardize input format: convert `to_replace` and `value` to a dictionary if needed 839 if isinstance(to_replace, (str, int)): 840 to_replace = [to_replace] 841 if isinstance(value, (str, int)): 842 value = [value] 843 if isinstance(to_replace, list) and isinstance(value, list): 844 dictionary = dict(zip(to_replace, value)) 845 elif isinstance(to_replace, dict) and value is None: 846 dictionary = to_replace 847 else: 848 raise ValueError( 849 "Invalid input: `to_replace` and `value` must be compatible types (both str, list of str, or dict)." 850 ) 851 852 # Vectorized function for replacing values in chromosome array 853 vec_replace_values = np.vectorize(self._match_to_replace) 854 855 # Rename chromosomes based on inplace flag 856 if inplace: 857 self.variants_chrom = vec_replace_values(self.variants_chrom, dictionary, regex) 858 return None 859 else: 860 snpobj = self.copy() 861 snpobj.variants_chrom = vec_replace_values(self.variants_chrom, dictionary, regex) 862 return snpobj 863 864 def rename_missings( 865 self, 866 before: Union[int, float, str] = -1, 867 after: Union[int, float, str] = '.', 868 inplace: bool = False 869 ) -> Optional['SNPObject']: 870 """ 871 Replace missing values in the `calldata_gt` attribute. 872 873 This method identifies missing values in 'calldata_gt' and replaces them with a specified 874 value. By default, it replaces occurrences of `-1` (often used to signify missing data) with `'.'`. 875 876 Args: 877 before (int, float, or str, default=-1): 878 The current representation of missing values in `calldata_gt`. Common values might be -1, '.', or NaN. 879 Default is -1. 880 after (int, float, or str, default='.'): 881 The value that will replace `before`. Default is '.'. 882 inplace (bool, default=False): 883 If True, modifies `self` in place. If False, returns a new `SNPObject` with the applied 884 replacements. Default is False. 885 886 Returns: 887 **Optional[SNPObject]:** 888 A new `SNPObject` with the renamed missing values if `inplace=False`. 889 If `inplace=True`, modifies `self` in place and returns None. 890 """ 891 # Rename missing values in the `calldata_gt` attribute based on inplace flag 892 if inplace: 893 self['calldata_gt'] = np.where(self['calldata_gt'] == before, after, self['calldata_gt']) 894 return None 895 else: 896 snpobj = self.copy() 897 snpobj['calldata_gt'] = np.where(snpobj['calldata_gt'] == before, after, snpobj['calldata_gt']) 898 return snpobj 899 900 def get_common_variants_intersection( 901 self, 902 snpobj: 'SNPObject', 903 index_by: str = 'pos' 904 ) -> Tuple[List[str], np.ndarray, np.ndarray]: 905 """ 906 Identify common variants between `self` and the `snpobj` instance based on the specified `index_by` criterion, 907 which may match based on chromosome and position (`variants_chrom`, `variants_pos`), ID (`variants_id`), or both. 908 909 This method returns the identifiers of common variants and their corresponding indices in both objects. 910 911 Args: 912 snpobj (SNPObject): 913 The reference SNPObject to compare against. 914 index_by (str, default='pos'): 915 Criteria for matching variants. Options: 916 - `'pos'`: Matches by chromosome and position (`variants_chrom`, `variants_pos`), e.g., 'chr1-12345'. 917 - `'id'`: Matches by variant ID alone (`variants_id`), e.g., 'rs123'. 918 - `'pos+id'`: Matches by chromosome, position, and ID (`variants_chrom`, `variants_pos`, `variants_id`), e.g., 'chr1-12345-rs123'. 919 Default is 'pos'. 920 921 Returns: 922 Tuple containing: 923 - **list of str:** A list of common variant identifiers (as strings). 924 - **array:** An array of indices in `self` where common variants are located. 925 - **array:** An array of indices in `snpobj` where common variants are located. 926 """ 927 # Create unique identifiers for each variant in both SNPObjects based on the specified criterion 928 if index_by == 'pos': 929 query_identifiers = [f"{chrom}-{pos}" for chrom, pos in zip(self['variants_chrom'], self['variants_pos'])] 930 reference_identifiers = [f"{chrom}-{pos}" for chrom, pos in zip(snpobj['variants_chrom'], snpobj['variants_pos'])] 931 elif index_by == 'id': 932 query_identifiers = self['variants_id'].tolist() 933 reference_identifiers = snpobj['variants_id'].tolist() 934 elif index_by == 'pos+id': 935 query_identifiers = [ 936 f"{chrom}-{pos}-{ids}" for chrom, pos, ids in zip(self['variants_chrom'], self['variants_pos'], self['variants_id']) 937 ] 938 reference_identifiers = [ 939 f"{chrom}-{pos}-{ids}" for chrom, pos, ids in zip(snpobj['variants_chrom'], snpobj['variants_pos'], snpobj['variants_id']) 940 ] 941 else: 942 raise ValueError("`index_by` must be one of 'pos', 'id', or 'pos+id'.") 943 944 # Convert to sets for intersection 945 common_ids = set(query_identifiers).intersection(reference_identifiers) 946 947 # Collect indices for common identifiers 948 query_idx = [i for i, id in enumerate(query_identifiers) if id in common_ids] 949 reference_idx = [i for i, id in enumerate(reference_identifiers) if id in common_ids] 950 951 return list(common_ids), np.array(query_idx), np.array(reference_idx) 952 953 def get_common_markers_intersection( 954 self, 955 snpobj: 'SNPObject' 956 ) -> Tuple[List[str], np.ndarray, np.ndarray]: 957 """ 958 Identify common markers between between `self` and the `snpobj` instance. Common markers are identified 959 based on matching chromosome (`variants_chrom`), position (`variants_pos`), reference (`variants_ref`), 960 and alternate (`variants_alt`) alleles. 961 962 This method returns the identifiers of common markers and their corresponding indices in both objects. 963 964 Args: 965 snpobj (SNPObject): 966 The reference SNPObject to compare against. 967 968 Returns: 969 Tuple containing: 970 - **list of str:** A list of common variant identifiers (as strings). 971 - **array:** An array of indices in `self` where common variants are located. 972 - **array:** An array of indices in `snpobj` where common variants are located. 973 """ 974 # Generate unique identifiers based on chrom, pos, ref, and alt alleles 975 query_identifiers = [ 976 f"{chrom}-{pos}-{ref}-{alt}" for chrom, pos, ref, alt in 977 zip(self['variants_chrom'], self['variants_pos'], self['variants_ref'], self['variants_alt']) 978 ] 979 reference_identifiers = [ 980 f"{chrom}-{pos}-{ref}-{alt}" for chrom, pos, ref, alt in 981 zip(snpobj['variants_chrom'], snpobj['variants_pos'], snpobj['variants_ref'], snpobj['variants_alt']) 982 ] 983 984 # Convert to sets for intersection 985 common_ids = set(query_identifiers).intersection(reference_identifiers) 986 987 # Collect indices for common identifiers in both SNPObjects 988 query_idx = [i for i, id in enumerate(query_identifiers) if id in common_ids] 989 reference_idx = [i for i, id in enumerate(reference_identifiers) if id in common_ids] 990 991 return list(common_ids), np.array(query_idx), np.array(reference_idx) 992 993 def subset_to_common_variants( 994 self, 995 snpobj: 'SNPObject', 996 index_by: str = 'pos', 997 common_variants_intersection: Optional[Tuple[np.ndarray, np.ndarray]] = None, 998 inplace: bool = False 999 ) -> Optional['SNPObject']: 1000 """ 1001 Subset `self` to include only the common variants with a reference `snpobj` based on 1002 the specified `index_by` criterion, which may match based on chromosome and position 1003 (`variants_chrom`, `variants_pos`), ID (`variants_id`), or both. 1004 1005 Args: 1006 snpobj (SNPObject): 1007 The reference SNPObject to compare against. 1008 index_by (str, default='pos'): 1009 Criteria for matching variants. Options: 1010 - `'pos'`: Matches by chromosome and position (`variants_chrom`, `variants_pos`), e.g., 'chr1-12345'. 1011 - `'id'`: Matches by variant ID alone (`variants_id`), e.g., 'rs123'. 1012 - `'pos+id'`: Matches by chromosome, position, and ID (`variants_chrom`, `variants_pos`, `variants_id`), e.g., 'chr1-12345-rs123'. 1013 Default is 'pos'. 1014 common_variants_intersection (Tuple[np.ndarray, np.ndarray], optional): 1015 Precomputed indices of common variants between `self` and `snpobj`. If None, intersection is 1016 computed within the function. 1017 inplace (bool, default=False): 1018 If True, modifies `self` in place. If False, returns a new `SNPObject` with the common variants 1019 subsetted. Default is False. 1020 1021 Returns: 1022 **Optional[SNPObject]:** 1023 Optional[SNPObject]: A new `SNPObject` with the common variants subsetted if `inplace=False`. 1024 If `inplace=True`, modifies `self` in place and returns None. 1025 """ 1026 # Get indices of common variants if not provided 1027 if common_variants_intersection is None: 1028 _, query_idx, _ = self.get_common_variants_intersection(snpobj, index_by=index_by) 1029 else: 1030 query_idx, _ = common_variants_intersection 1031 1032 # Use filter_variants method with the identified indices, applying `inplace` as specified 1033 return self.filter_variants(indexes=query_idx, include=True, inplace=inplace) 1034 1035 def subset_to_common_markers( 1036 self, 1037 snpobj: 'SNPObject', 1038 common_markers_intersection: Optional[Tuple[np.ndarray, np.ndarray]] = None, 1039 inplace: bool = False 1040 ) -> Optional['SNPObject']: 1041 """ 1042 Subset `self` to include only the common markers with a reference `snpobj`. Common markers are identified 1043 based on matching chromosome (`variants_chrom`), position (`variants_pos`), reference (`variants_ref`), 1044 and alternate (`variants_alt`) alleles. 1045 1046 Args: 1047 snpobj (SNPObject): 1048 The reference SNPObject to compare against. 1049 common_markers_intersection (tuple of arrays, optional): 1050 Precomputed indices of common markers between `self` and `snpobj`. If None, intersection is 1051 computed within the function. 1052 inplace (bool, default=False): 1053 If True, modifies `self` in place. If False, returns a new `SNPObject` with the common markers 1054 subsetted. Default is False. 1055 1056 Returns: 1057 **SNPObject:** 1058 **Optional[SNPObject]:** A new `SNPObject` with the common markers subsetted if `inplace=False`. 1059 If `inplace=True`, modifies `self` in place and returns None. 1060 """ 1061 # Get indices of common markers if not provided 1062 if common_markers_intersection is None: 1063 _, query_idx, _ = self.get_common_markers_intersection(snpobj) 1064 else: 1065 query_idx, _ = common_markers_intersection 1066 1067 # Use filter_variants method with the identified indices, applying `inplace` as specified 1068 return self.filter_variants(indexes=query_idx, include=True, inplace=inplace) 1069 1070 def remove_strand_ambiguous_variants(self, inplace: bool = False) -> Optional['SNPObject']: 1071 """ 1072 A strand-ambiguous variant has reference (`variants_ref`) and alternate (`variants_alt`) alleles 1073 in the pairs A/T, T/A, C/G, or G/C, where both alleles are complementary and thus indistinguishable 1074 in terms of strand orientation. 1075 1076 Args: 1077 inplace (bool, default=False): 1078 If True, modifies `self` in place. If False, returns a new `SNPObject` with the 1079 strand-ambiguous variants removed. Default is False. 1080 1081 Returns: 1082 **Optional[SNPObject]:** A new `SNPObject` with non-ambiguous variants only if `inplace=False`. 1083 If `inplace=True`, modifies `self` in place and returns None. 1084 """ 1085 # Identify strand-ambiguous SNPs using vectorized comparisons 1086 is_AT = (self['variants_ref'] == 'A') & (self['variants_alt'] == 'T') 1087 is_TA = (self['variants_ref'] == 'T') & (self['variants_alt'] == 'A') 1088 is_CG = (self['variants_ref'] == 'C') & (self['variants_alt'] == 'G') 1089 is_GC = (self['variants_ref'] == 'G') & (self['variants_alt'] == 'C') 1090 1091 # Create a combined mask for all ambiguous variants 1092 ambiguous_mask = is_AT | is_TA | is_CG | is_GC 1093 non_ambiguous_idx = np.where(~ambiguous_mask)[0] 1094 1095 # Count each type of ambiguity using numpy's sum on boolean arrays 1096 A_T_count = np.sum(is_AT) 1097 T_A_count = np.sum(is_TA) 1098 C_G_count = np.sum(is_CG) 1099 G_C_count = np.sum(is_GC) 1100 1101 # Log the counts of each type of strand-ambiguous variants 1102 total_ambiguous = A_T_count + T_A_count + C_G_count + G_C_count 1103 log.info(f'{A_T_count} ambiguities of A-T type.') 1104 log.info(f'{T_A_count} ambiguities of T-A type.') 1105 log.info(f'{C_G_count} ambiguities of C-G type.') 1106 log.info(f'{G_C_count} ambiguities of G-C type.') 1107 1108 # Filter out ambiguous variants and keep non-ambiguous ones 1109 log.debug(f'Removing {total_ambiguous} strand-ambiguous variants...') 1110 return self.filter_variants(indexes=non_ambiguous_idx, include=True, inplace=inplace) 1111 1112 def correct_flipped_variants( 1113 self, 1114 snpobj: 'SNPObject', 1115 check_complement: bool = True, 1116 index_by: str = 'pos', 1117 common_variants_intersection: Optional[Tuple[np.ndarray, np.ndarray]] = None, 1118 log_stats: bool = True, 1119 inplace: bool = False 1120 ) -> Optional['SNPObject']: 1121 """ 1122 Correct flipped variants between between `self` and a reference `snpobj`, where reference (`variants_ref`) 1123 and alternate (`variants_alt`) alleles are swapped. 1124 1125 **Flip Detection Based on `check_complement`:** 1126 1127 - If `check_complement=False`, only direct allele swaps are considered: 1128 1. **Direct Swap:** `self.variants_ref == snpobj.variants_alt` and `self.variants_alt == snpobj.variants_ref`. 1129 1130 - If `check_complement=True`, both direct and complementary swaps are considered, with four possible cases: 1131 1. **Direct Swap:** `self.variants_ref == snpobj.variants_alt` and `self.variants_alt == snpobj.variants_ref`. 1132 2. **Complement Swap of Ref:** `complement(self.variants_ref) == snpobj.variants_alt` and `self.variants_alt == snpobj.variants_ref`. 1133 3. **Complement Swap of Alt:** `self.variants_ref == snpobj.variants_alt` and `complement(self.variants_alt) == snpobj.variants_ref`. 1134 4. **Complement Swap of both Ref and Alt:** `complement(self.variants_ref) == snpobj.variants_alt` and `complement(self.variants_alt) == snpobj.variants_ref`. 1135 1136 **Note:** Variants where `self.variants_ref == self.variants_alt` are ignored as they are ambiguous. 1137 1138 **Correction Process:** 1139 - Swaps `variants_ref` and `variants_alt` alleles in `self` to align with `snpobj`. 1140 - Flips `calldata_gt` values (0 becomes 1, and 1 becomes 0) to match the updated allele configuration. 1141 1142 Args: 1143 snpobj (SNPObject): 1144 The reference SNPObject to compare against. 1145 check_complement (bool, default=True): 1146 If True, also checks for complementary base pairs (A/T, T/A, C/G, and G/C) when identifying swapped variants. 1147 Default is True. 1148 index_by (str, default='pos'): 1149 Criteria for matching variants. Options: 1150 - `'pos'`: Matches by chromosome and position (`variants_chrom`, `variants_pos`), e.g., 'chr1-12345'. 1151 - `'id'`: Matches by variant ID alone (`variants_id`), e.g., 'rs123'. 1152 - `'pos+id'`: Matches by chromosome, position, and ID (`variants_chrom`, `variants_pos`, `variants_id`), e.g., 'chr1-12345-rs123'. 1153 Default is 'pos'. 1154 common_variants_intersection (tuple of arrays, optional): 1155 Precomputed indices of common variants between `self` and `snpobj`. If None, intersection is 1156 computed within the function. 1157 log_stats (bool, default=True): 1158 If True, logs statistical information about matching and ambiguous alleles. Default is True. 1159 inplace (bool, default=False): 1160 If True, modifies `self` in place. If False, returns a new `SNPObject` with corrected 1161 flips. Default is False. 1162 1163 Returns: 1164 **Optional[SNPObject]**: 1165 A new `SNPObject` with corrected flips if `inplace=False`. 1166 If `inplace=True`, modifies `self` in place and returns None. 1167 """ 1168 # Define complement mappings for nucleotides 1169 complement_map = {'A': 'T', 'T': 'A', 'C': 'G', 'G': 'C'} 1170 1171 # Helper function to get the complement of a base 1172 def get_complement(base: str) -> str: 1173 return complement_map.get(base, base) 1174 1175 # Get common variant indices if not provided 1176 if common_variants_intersection != None: 1177 query_idx, reference_idx = common_variants_intersection 1178 else: 1179 _, query_idx, reference_idx = self.get_common_variants_intersection(snpobj, index_by=index_by) 1180 1181 # Log statistics on matching alleles if enabled 1182 if log_stats: 1183 matching_ref = np.sum(self['variants_ref'][query_idx] == snpobj['variants_ref'][reference_idx]) 1184 matching_alt = np.sum(self['variants_alt'][query_idx] == snpobj['variants_alt'][reference_idx]) 1185 ambiguous = np.sum(self['variants_ref'][query_idx] == self['variants_alt'][query_idx]) 1186 log.info(f"Matching reference alleles (ref=ref'): {matching_ref}, Matching alternate alleles (alt=alt'): {matching_alt}.") 1187 log.info(f"Number of ambiguous alleles (ref=alt): {ambiguous}.") 1188 1189 # Identify indices where `ref` and `alt` alleles are swapped 1190 if not check_complement: 1191 # Simple exact match for swapped alleles 1192 swapped_ref = (self['variants_ref'][query_idx] == snpobj['variants_alt'][reference_idx]) 1193 swapped_alt = (self['variants_alt'][query_idx] == snpobj['variants_ref'][reference_idx]) 1194 else: 1195 # Check for swapped or complementary-swapped alleles 1196 swapped_ref = ( 1197 (self['variants_ref'][query_idx] == snpobj['variants_alt'][reference_idx]) | 1198 (np.vectorize(get_complement)(self['variants_ref'][query_idx]) == snpobj['variants_alt'][reference_idx]) 1199 ) 1200 swapped_alt = ( 1201 (self['variants_alt'][query_idx] == snpobj['variants_ref'][reference_idx]) | 1202 (np.vectorize(get_complement)(self['variants_alt'][query_idx]) == snpobj['variants_ref'][reference_idx]) 1203 ) 1204 1205 # Filter out ambiguous variants where `ref` and `alt` alleles match (ref=alt) 1206 not_ambiguous = (self['variants_ref'][query_idx] != self['variants_alt'][query_idx]) 1207 1208 # Indices in `self` of flipped variants 1209 flip_idx_query = query_idx[swapped_ref & swapped_alt & not_ambiguous] 1210 1211 # Correct the identified variant flips 1212 if len(flip_idx_query) > 0: 1213 log.info(f'Correcting {len(flip_idx_query)} variant flips...') 1214 1215 temp_alts = self['variants_alt'][flip_idx_query] 1216 temp_refs = self['variants_ref'][flip_idx_query] 1217 1218 # Correct the variant flips based on whether the operation is in-place or not 1219 if inplace: 1220 self['variants_alt'][flip_idx_query] = temp_refs 1221 self['variants_ref'][flip_idx_query] = temp_alts 1222 self['calldata_gt'][flip_idx_query] = 1 - self['calldata_gt'][flip_idx_query] 1223 return None 1224 else: 1225 snpobj = self.copy() 1226 snpobj['variants_alt'][flip_idx_query] = temp_refs 1227 snpobj['variants_ref'][flip_idx_query] = temp_alts 1228 snpobj['calldata_gt'][flip_idx_query] = 1 - snpobj['calldata_gt'][flip_idx_query] 1229 return snpobj 1230 else: 1231 log.info('No variant flips found to correct.') 1232 return self if not inplace else None 1233 1234 def remove_mismatching_variants( 1235 self, 1236 snpobj: 'SNPObject', 1237 index_by: str = 'pos', 1238 common_variants_intersection: Optional[Tuple[np.ndarray, np.ndarray]] = None, 1239 inplace: bool = False 1240 ) -> Optional['SNPObject']: 1241 """ 1242 Remove variants from `self`, where reference (`variants_ref`) and/or alternate (`variants_alt`) alleles 1243 do not match with a reference `snpobj`. 1244 1245 Args: 1246 snpobj (SNPObject): 1247 The reference SNPObject to compare against. 1248 index_by (str, default='pos'): 1249 Criteria for matching variants. Options: 1250 - `'pos'`: Matches by chromosome and position (`variants_chrom`, `variants_pos`), e.g., 'chr1-12345'. 1251 - `'id'`: Matches by variant ID alone (`variants_id`), e.g., 'rs123'. 1252 - `'pos+id'`: Matches by chromosome, position, and ID (`variants_chrom`, `variants_pos`, `variants_id`), e.g., 'chr1-12345-rs123'. 1253 Default is 'pos'. 1254 common_variants_intersection (tuple of arrays, optional): 1255 Precomputed indices of common variants between `self` and the reference `snpobj`. 1256 If None, the intersection is computed within the function. 1257 inplace (bool, default=False): 1258 If True, modifies `self` in place. If False, returns a new `SNPObject` without 1259 mismatching variants. Default is False. 1260 1261 Returns: 1262 **Optional[SNPObject]:** 1263 A new `SNPObject` without mismatching variants if `inplace=False`. 1264 If `inplace=True`, modifies `self` in place and returns None. 1265 """ 1266 # Get common variant indices if not provided 1267 if common_variants_intersection is not None: 1268 query_idx, reference_idx = common_variants_intersection 1269 else: 1270 _, query_idx, reference_idx = self.get_common_variants_intersection(snpobj, index_by=index_by) 1271 1272 # Vectorized comparison of `ref` and `alt` alleles 1273 ref_mismatch = self['variants_ref'][query_idx] != snpobj['variants_ref'][reference_idx] 1274 alt_mismatch = self['variants_alt'][query_idx] != snpobj['variants_alt'][reference_idx] 1275 mismatch_mask = ref_mismatch | alt_mismatch 1276 1277 # Identify indices in `self` of mismatching variants 1278 mismatch_idx = query_idx[mismatch_mask] 1279 1280 # Compute total number of variant mismatches 1281 total_mismatches = np.sum(mismatch_mask) 1282 1283 # Filter out mismatching variants 1284 log.debug(f'Removing {total_mismatches} mismatching variants...') 1285 return self.filter_variants(indexes=mismatch_idx, include=True, inplace=inplace) 1286 1287 def shuffle_variants(self, inplace: bool = False) -> Optional['SNPObject']: 1288 """ 1289 Randomly shuffle the positions of variants in the SNPObject, ensuring that all associated 1290 data (e.g., `calldata_gt` and variant-specific attributes) remain aligned. 1291 1292 Args: 1293 inplace (bool, default=False): 1294 If True, modifies `self` in place. If False, returns a new `SNPObject` with 1295 shuffled variants. Default is False. 1296 1297 Returns: 1298 **Optional[SNPObject]:** 1299 A new `SNPObject` without shuffled variant positions if `inplace=False`. 1300 If `inplace=True`, modifies `self` in place and returns None. 1301 """ 1302 # Generate a random permutation index for shuffling variant positions 1303 shuffle_index = np.random.permutation(self.n_snps) 1304 1305 # Apply shuffling to all relevant attributes using the class's dictionary-like interface 1306 if inplace: 1307 for key in self.keys(): 1308 if self[key] is not None: 1309 if key == 'calldata_gt': 1310 # `calldata_gt`` has a different shape, so it's shuffled along axis 0 1311 self[key] = self[key][shuffle_index, ...] 1312 elif 'variant' in key: 1313 # Other attributes are 1D arrays 1314 self[key] = np.asarray(self[key])[shuffle_index] 1315 return None 1316 else: 1317 shuffled_snpobj = self.copy() 1318 for key in shuffled_snpobj.keys(): 1319 if shuffled_snpobj[key] is not None: 1320 if key == 'calldata_gt': 1321 shuffled_snpobj[key] = shuffled_snpobj[key][shuffle_index, ...] 1322 elif 'variant' in key: 1323 shuffled_snpobj[key] = np.asarray(shuffled_snpobj[key])[shuffle_index] 1324 return shuffled_snpobj 1325 1326 def set_empty_to_missing(self, inplace: bool = False) -> Optional['SNPObject']: 1327 """ 1328 Replace empty strings `''` with missing values `'.'` in attributes of `self`. 1329 1330 Args: 1331 inplace (bool, default=False): 1332 If True, modifies `self` in place. If False, returns a new `SNPObject` with empty 1333 strings `''` replaced by missing values `'.'`. Default is False. 1334 1335 Returns: 1336 **Optional[SNPObject]:** 1337 A new `SNPObject` with empty strings replaced if `inplace=False`. 1338 If `inplace=True`, modifies `self` in place and returns None. 1339 """ 1340 if inplace: 1341 if self.variants_alt is not None: 1342 self.variants_alt[self.variants_alt == ''] = '.' 1343 if self.variants_ref is not None: 1344 self.variants_ref[self.variants_ref == ''] = '.' 1345 if self.variants_qual is not None: 1346 self.variants_qual = self.variants_qual.astype(str) 1347 self.variants_qual[(self.variants_qual == '') | (self.variants_qual == 'nan')] = '.' 1348 if self.variants_chrom is not None: 1349 self.variants_chrom = self.variants_chrom.astype(str) 1350 self.variants_chrom[self.variants_chrom == ''] = '.' 1351 if self.variants_filter_pass is not None: 1352 self.variants_filter_pass[self.variants_filter_pass == ''] = '.' 1353 if self.variants_id is not None: 1354 self.variants_id[self.variants_id == ''] = '.' 1355 return self 1356 else: 1357 snpobj = self.copy() 1358 if snpobj.variants_alt is not None: 1359 snpobj.variants_alt[snpobj.variants_alt == ''] = '.' 1360 if snpobj.variants_ref is not None: 1361 snpobj.variants_ref[snpobj.variants_ref == ''] = '.' 1362 if snpobj.variants_qual is not None: 1363 snpobj.variants_qual = snpobj.variants_qual.astype(str) 1364 snpobj.variants_qual[(snpobj.variants_qual == '') | (snpobj.variants_qual == 'nan')] = '.' 1365 if snpobj.variants_chrom is not None: 1366 snpobj.variants_chrom[snpobj.variants_chrom == ''] = '.' 1367 if snpobj.variants_filter_pass is not None: 1368 snpobj.variants_filter_pass[snpobj.variants_filter_pass == ''] = '.' 1369 if snpobj.variants_id is not None: 1370 snpobj.variants_id[snpobj.variants_id == ''] = '.' 1371 return snpobj 1372 1373 def convert_to_window_level( 1374 self, 1375 window_size: Optional[int] = None, 1376 physical_pos: Optional[np.ndarray] = None, 1377 chromosomes: Optional[np.ndarray] = None, 1378 window_sizes: Optional[np.ndarray] = None, 1379 laiobj: Optional['LocalAncestryObject'] = None 1380 ) -> 'LocalAncestryObject': 1381 """ 1382 Aggregate the `calldata_lai` attribute into genomic windows within a 1383 `snputils.ancestry.genobj.LocalAncestryObject`. 1384 1385 **Options for defining windows (in order of precedence):** 1386 1387 1. **Fixed window size**: 1388 - Use `window_size` to specify how many SNPs go into each window. The last window on each 1389 chromosome may be larger if SNPs are not evenly divisible by the size. 1390 1391 2. **Custom start and end positions**: 1392 - Provide `physical_pos` (2D array of shape (n_windows, 2)) as the [start, end] base-pair 1393 coordinates for each window. 1394 - If `chromosomes` is not provided and `self` has exactly one chromosome, all windows are 1395 assumed to belong to that chromosome. 1396 - If multiple chromosomes exist but `chromosomes` is missing, an error will be raised. 1397 - Optionally, provide `window_sizes` to store the SNP count per-window. 1398 1399 3. **Matching existing windows**: 1400 - Reuse window definitions (`physical_pos`, `chromosomes`, `window_sizes`) from an existing `laiobj`. 1401 1402 Args: 1403 window_size (int, optional): 1404 Number of SNPs in each window if defining fixed-size windows. If the total number of 1405 SNPs in a chromosome is not evenly divisible by the window size, the last window on that 1406 chromosome will include all remaining SNPs and therefore be larger than the specified size. 1407 physical_pos (array of shape (n_windows, 2), optional): 1408 A 2D array containing the start and end physical positions for each window. 1409 chromosomes (array of shape (n_windows,), optional): 1410 An array with chromosome numbers corresponding to each genomic window. 1411 window_sizes (array of shape (n_windows,), optional): 1412 An array specifying the number of SNPs in each genomic window. 1413 laiobj (LocalAncestryObject, optional): 1414 A reference `LocalAncestryObject` from which to copy existing window definitions. 1415 1416 Returns: 1417 **LocalAncestryObject:** 1418 A LocalAncestryObject containing window-level ancestry data. 1419 """ 1420 from snputils.ancestry.genobj.local import LocalAncestryObject 1421 1422 if window_size is None and physical_pos is None and laiobj is None: 1423 raise ValueError("One of `window_size`, `physical_pos`, or `laiobj` must be provided.") 1424 1425 # 1. Fixed window size 1426 if window_size is not None: 1427 physical_pos = [] # Boundaries [start, end] of each window 1428 chromosomes = [] # Chromosome for each window 1429 window_sizes = [] # Number of SNPs for each window 1430 for chrom in self.unique_chrom: 1431 # Extract indices corresponding to this chromosome 1432 mask_chrom = (self.variants_chrom == chrom) 1433 # Subset to this chromosome 1434 pos_chrom = self.variants_pos[mask_chrom] 1435 # Number of SNPs for this chromosome 1436 n_snps_chrom = pos_chrom.size 1437 1438 # Initialize the start of the first window with the position of the first SNP 1439 current_start = self.variants_pos[0] 1440 1441 # Number of full windows with exactly `window_size` SNPs 1442 n_full_windows = n_snps_chrom // window_size 1443 1444 # Build all but the last window 1445 for i in range(n_full_windows-1): 1446 current_end = self.variants_pos[(i+1) * window_size - 1] 1447 physical_pos.append([current_start, current_end]) 1448 chromosomes.append(chrom) 1449 window_sizes.append(window_size) 1450 current_start = self.variants_pos[(i+1) * window_size] 1451 1452 # Build the last window 1453 current_end = self.variants_pos[-1] 1454 physical_pos.append([current_start, current_end]) 1455 chromosomes.append(chrom) 1456 window_sizes.append(n_snps_chrom - ((n_full_windows - 1) * window_size)) 1457 1458 physical_pos = np.array(physical_pos) 1459 chromosomes = np.array(chromosomes) 1460 window_sizes = np.array(window_sizes) 1461 1462 # 2. Custom start and end positions 1463 elif physical_pos is not None: 1464 # Check if there is exactly one chromosome 1465 if chromosomes is None: 1466 unique_chrom = self.unique_chrom 1467 if len(unique_chrom) == 1: 1468 # We assume all windows belong to this single chromosome 1469 single_chrom = unique_chrom[0] 1470 chromosomes = np.array([single_chrom] * physical_pos.shape[0]) 1471 else: 1472 raise ValueError("Multiple chromosomes detected, but `chromosomes` was not provided.") 1473 1474 # 3. Match existing windows to a reference laiobj 1475 elif laiobj is not None: 1476 physical_pos = laiobj.physical_pos 1477 chromosomes = laiobj.chromosomes 1478 window_sizes = laiobj.window_sizes 1479 1480 # Allocate an output LAI array 1481 n_windows = physical_pos.shape[0] 1482 n_samples = self.n_samples 1483 if len(self.calldata_lai.shape) == 3: 1484 lai = np.zeros((n_windows, n_samples, 2)) 1485 else: 1486 lai = np.zeros((n_windows, n_samples*2)) 1487 1488 # For each window, find the relevant SNPs and compute the mode of the ancestries 1489 for i, ((start, end), chrom) in enumerate(zip(physical_pos, chromosomes)): 1490 snps_mask = ( 1491 (self.variants_chrom == chrom) & 1492 (self.variants_pos >= start) & 1493 (self.variants_pos <= end) 1494 ) 1495 if np.any(snps_mask): 1496 lai_mask = self.calldata_lai[snps_mask, ...] 1497 mode_ancestries = mode(lai_mask, axis=0, nan_policy='omit').mode 1498 lai[i] = mode_ancestries 1499 else: 1500 lai[i] = np.nan 1501 1502 # Generate haplotype labels, e.g. "Sample1.0", "Sample1.1" 1503 haplotypes = [f"{sample}.{i}" for sample in self.samples for i in range(2)] 1504 1505 # If original data was (n_snps, n_samples, 2), flatten to (n_windows, n_samples*2) 1506 if len(self.calldata_lai.shape) == 3: 1507 lai = lai.reshape(n_windows, -1) 1508 1509 # Aggregate into a LocalAncestryObject 1510 return LocalAncestryObject( 1511 haplotypes=haplotypes, 1512 lai=lai, 1513 samples=self.samples, 1514 ancestry_map=self.ancestry_map, 1515 window_sizes=window_sizes, 1516 physical_pos=physical_pos, 1517 chromosomes=chromosomes 1518 ) 1519 1520 def save(self, file: Union[str, Path]) -> None: 1521 """ 1522 Save the data stored in `self` to a specified file. 1523 1524 The format of the saved file is determined by the file extension provided in the `file` 1525 argument. 1526 1527 **Supported formats:** 1528 1529 - `.bed`: Binary PED (Plink) format. 1530 - `.pgen`: Plink2 binary genotype format. 1531 - `.vcf`: Variant Call Format. 1532 - `.pkl`: Pickle format for saving `self` in serialized form. 1533 1534 Args: 1535 file (str or pathlib.Path): 1536 Path to the file where the data will be saved. The extension of the file determines the save format. 1537 Supported extensions: `.bed`, `.pgen`, `.vcf`, `.pkl`. 1538 """ 1539 ext = Path(file).suffix.lower() 1540 if ext == '.bed': 1541 self.save_bed(file) 1542 elif ext == '.pgen': 1543 self.save_pgen(file) 1544 elif ext == '.vcf': 1545 self.save_vcf(file) 1546 elif ext == '.pkl': 1547 self.save_pickle(file) 1548 else: 1549 raise ValueError(f"Unsupported file extension: {ext}") 1550 1551 def save_bed(self, file: Union[str, Path]) -> None: 1552 """ 1553 Save the data stored in `self` to a `.bed` file. 1554 1555 Args: 1556 file (str or pathlib.Path): 1557 Path to the file where the data will be saved. It should end with `.bed`. 1558 If the provided path does not have this extension, it will be appended. 1559 """ 1560 from snputils.snp.io.write.bed import BEDWriter 1561 writer = BEDWriter(snpobj=self, filename=file) 1562 writer.write() 1563 1564 def save_pgen(self, file: Union[str, Path]) -> None: 1565 """ 1566 Save the data stored in `self` to a `.pgen` file. 1567 1568 Args: 1569 file (str or pathlib.Path): 1570 Path to the file where the data will be saved. It should end with `.pgen`. 1571 If the provided path does not have this extension, it will be appended. 1572 """ 1573 from snputils.snp.io.write.pgen import PGENWriter 1574 writer = PGENWriter(snpobj=self, filename=file) 1575 writer.write() 1576 1577 def save_vcf(self, file: Union[str, Path]) -> None: 1578 """ 1579 Save the data stored in `self` to a `.vcf` file. 1580 1581 Args: 1582 file (str or pathlib.Path): 1583 Path to the file where the data will be saved. It should end with `.vcf`. 1584 If the provided path does not have this extension, it will be appended. 1585 """ 1586 from snputils.snp.io.write.vcf import VCFWriter 1587 writer = VCFWriter(snpobj=self, filename=file) 1588 writer.write() 1589 1590 def save_pickle(self, file: Union[str, Path]) -> None: 1591 """ 1592 Save `self` in serialized form to a `.pkl` file. 1593 1594 Args: 1595 file (str or pathlib.Path): 1596 Path to the file where the data will be saved. It should end with `.pkl`. 1597 If the provided path does not have this extension, it will be appended. 1598 """ 1599 import pickle 1600 with open(file, 'wb') as file: 1601 pickle.dump(self, file) 1602 1603 @staticmethod 1604 def _match_to_replace(val: Union[str, int, float], dictionary: Dict[Any, Any], regex: bool = True) -> Union[str, int, float]: 1605 """ 1606 Find a matching key in the provided dictionary for the given value `val` 1607 and replace it with the corresponding value. 1608 1609 Args: 1610 val (str, int, or float): 1611 The value to be matched and potentially replaced. 1612 dictionary (Dict): 1613 A dictionary containing keys and values for matching and replacement. 1614 The keys should match the data type of `val`. 1615 regex (bool): 1616 If True, interprets keys in `dictionary` as regular expressions. 1617 Default is True. 1618 1619 Returns: 1620 str, int, or float: 1621 The replacement value from `dictionary` if a match is found; otherwise, the original `val`. 1622 """ 1623 if regex: 1624 # Use regular expression matching to find replacements 1625 for key, value in dictionary.items(): 1626 if isinstance(key, str): 1627 match = re.match(key, val) 1628 if match: 1629 # Replace using the first matching regex pattern 1630 return re.sub(key, value, val) 1631 # Return the original value if no regex match is found 1632 return val 1633 else: 1634 # Return the value for `val` if present in `dictionary`; otherwise, return `val` 1635 return dictionary.get(val, val) 1636 1637 @staticmethod 1638 def _get_chromosome_number(chrom_string: str) -> Union[int, str]: 1639 """ 1640 Extracts the chromosome number from the given chromosome string. 1641 1642 Args: 1643 chrom_string (str): 1644 The chromosome identifier. 1645 1646 Returns: 1647 int or str: 1648 The numeric representation of the chromosome if detected. 1649 Returns 10001 for 'X' or 'chrX', 10002 for 'Y' or 'chrY', 1650 and the original `chrom_string` if unrecognized. 1651 """ 1652 if chrom_string.isdigit(): 1653 return int(chrom_string) 1654 else: 1655 chrom_num = re.search(r'\d+', chrom_string) 1656 if chrom_num: 1657 return int(chrom_num.group()) 1658 elif chrom_string.lower() in ['x', 'chrx']: 1659 return 10001 1660 elif chrom_string.lower() in ['y', 'chry']: 1661 return 10002 1662 else: 1663 log.warning(f"Chromosome nomenclature not standard. Chromosome: {chrom_string}") 1664 return chrom_string 1665 1666 def _sanity_check(self) -> None: 1667 """ 1668 Perform sanity checks to ensure LAI and ancestry map consistency. 1669 1670 This method checks that all unique ancestries in the LAI data are represented 1671 in the ancestry map if it is provided. 1672 """ 1673 if self.__calldata_lai is not None and self.__ancestry_map is not None: 1674 unique_ancestries = np.unique(self.__calldata_lai) 1675 missing_ancestries = [anc for anc in unique_ancestries if str(anc) not in self.__ancestry_map] 1676 if missing_ancestries: 1677 warnings.warn(f"Missing ancestries in ancestry_map: {missing_ancestries}")
A class for Single Nucleotide Polymorphism (SNP) data, with optional support for SNP-level Local Ancestry Information (LAI).
22 def __init__( 23 self, 24 calldata_gt: Optional[np.ndarray] = None, 25 samples: Optional[np.ndarray] = None, 26 variants_ref: Optional[np.ndarray] = None, 27 variants_alt: Optional[np.ndarray] = None, 28 variants_chrom: Optional[np.ndarray] = None, 29 variants_filter_pass: Optional[np.ndarray] = None, 30 variants_id: Optional[np.ndarray] = None, 31 variants_pos: Optional[np.ndarray] = None, 32 variants_qual: Optional[np.ndarray] = None, 33 calldata_lai: Optional[np.ndarray] = None, 34 ancestry_map: Optional[Dict[str, str]] = None 35 ) -> None: 36 """ 37 Args: 38 calldata_gt (array, optional): 39 An array containing genotype data for each sample. This array can be either 2D with shape 40 `(n_snps, n_samples)` if the paternal and maternal strands are averaged, or 3D with shape 41 `(n_snps, n_samples, 2)` if the strands are kept separate. 42 samples (array of shape (n_sampels,), optional): 43 An array containing unique sample identifiers. 44 variants_ref (array of shape (n_snps,), optional): 45 An array containing the reference allele for each SNP. 46 variants_alt (array of shape (n_snps,), optional): 47 An array containing the alternate allele for each SNP. 48 variants_chrom (array of shape (n_snps,), optional): 49 An array containing the chromosome for each SNP. 50 variants_filter_pass (array of shape (n_snps,), optional): 51 An array indicating whether each SNP passed control checks. 52 variants_id (array of shape (n_snps,), optional): 53 An array containing unique identifiers (IDs) for each SNP. 54 variants_pos (array of shape (n_snps,), optional): 55 An array containing the chromosomal positions for each SNP. 56 variants_qual (array of shape (n_snps,), optional): 57 An array containing the Phred-scaled quality score for each SNP. 58 calldata_lai (array, optional): 59 An array containing the ancestry for each SNP. This array can be either 2D with shape 60 `(n_snps, n_samples*2)`, or 3D with shape (n_snps, n_samples, 2). 61 ancestry_map (dict of str to str, optional): 62 A dictionary mapping ancestry codes to region names. 63 """ 64 self.__calldata_gt = calldata_gt 65 self.__samples = samples 66 self.__variants_ref = variants_ref 67 self.__variants_alt = variants_alt 68 self.__variants_chrom = variants_chrom 69 self.__variants_filter_pass = variants_filter_pass 70 self.__variants_id = variants_id 71 self.__variants_pos = variants_pos 72 self.__variants_qual = variants_qual 73 self.__calldata_lai = calldata_lai 74 self.__ancestry_map = ancestry_map 75 76 self._sanity_check()
Arguments:
- calldata_gt (array, optional): An array containing genotype data for each sample. This array can be either 2D with shape
(n_snps, n_samples)
if the paternal and maternal strands are averaged, or 3D with shape(n_snps, n_samples, 2)
if the strands are kept separate. - samples (array of shape (n_sampels,), optional): An array containing unique sample identifiers.
- variants_ref (array of shape (n_snps,), optional): An array containing the reference allele for each SNP.
- variants_alt (array of shape (n_snps,), optional): An array containing the alternate allele for each SNP.
- variants_chrom (array of shape (n_snps,), optional): An array containing the chromosome for each SNP.
- variants_filter_pass (array of shape (n_snps,), optional): An array indicating whether each SNP passed control checks.
- variants_id (array of shape (n_snps,), optional): An array containing unique identifiers (IDs) for each SNP.
- variants_pos (array of shape (n_snps,), optional): An array containing the chromosomal positions for each SNP.
- variants_qual (array of shape (n_snps,), optional): An array containing the Phred-scaled quality score for each SNP.
- calldata_lai (array, optional): An array containing the ancestry for each SNP. This array can be either 2D with shape
(n_snps, n_samples*2)
, or 3D with shape (n_snps, n_samples, 2). - ancestry_map (dict of str to str, optional): A dictionary mapping ancestry codes to region names.
98 @property 99 def calldata_gt(self) -> np.ndarray: 100 """ 101 Retrieve `calldata_gt`. 102 103 Returns: 104 **array:** 105 An array containing genotype data for each sample. This array can be either 2D with shape 106 `(n_snps, n_samples)` if the paternal and maternal strands are averaged, or 3D with shape 107 `(n_snps, n_samples, 2)` if the strands are kept separate. 108 """ 109 return self.__calldata_gt
Retrieve calldata_gt
.
Returns:
array: An array containing genotype data for each sample. This array can be either 2D with shape
(n_snps, n_samples)
if the paternal and maternal strands are averaged, or 3D with shape(n_snps, n_samples, 2)
if the strands are kept separate.
118 @property 119 def samples(self) -> Optional[np.ndarray]: 120 """ 121 Retrieve `samples`. 122 123 Returns: 124 **array of shape (n_sampels,):** 125 An array containing unique sample identifiers. 126 """ 127 return self.__samples
Retrieve samples
.
Returns:
array of shape (n_sampels,): An array containing unique sample identifiers.
136 @property 137 def variants_ref(self) -> Optional[np.ndarray]: 138 """ 139 Retrieve `variants_ref`. 140 141 Returns: 142 **array of shape (n_snps,):** An array containing the reference allele for each SNP. 143 """ 144 return self.__variants_ref
Retrieve variants_ref
.
Returns:
array of shape (n_snps,): An array containing the reference allele for each SNP.
153 @property 154 def variants_alt(self) -> Optional[np.ndarray]: 155 """ 156 Retrieve `variants_alt`. 157 158 Returns: 159 **array of shape (n_snps,):** An array containing the alternate allele for each SNP. 160 """ 161 return self.__variants_alt
Retrieve variants_alt
.
Returns:
array of shape (n_snps,): An array containing the alternate allele for each SNP.
170 @property 171 def variants_chrom(self) -> Optional[np.ndarray]: 172 """ 173 Retrieve `variants_chrom`. 174 175 Returns: 176 **array of shape (n_snps,):** An array containing the chromosome for each SNP. 177 """ 178 return self.__variants_chrom
Retrieve variants_chrom
.
Returns:
array of shape (n_snps,): An array containing the chromosome for each SNP.
187 @property 188 def variants_filter_pass(self) -> Optional[np.ndarray]: 189 """ 190 Retrieve `variants_filter_pass`. 191 192 Returns: 193 **array of shape (n_snps,):** An array indicating whether each SNP passed control checks. 194 """ 195 return self.__variants_filter_pass
Retrieve variants_filter_pass
.
Returns:
array of shape (n_snps,): An array indicating whether each SNP passed control checks.
204 @property 205 def variants_id(self) -> Optional[np.ndarray]: 206 """ 207 Retrieve `variants_id`. 208 209 Returns: 210 **array of shape (n_snps,):** An array containing unique identifiers (IDs) for each SNP. 211 """ 212 return self.__variants_id
Retrieve variants_id
.
Returns:
array of shape (n_snps,): An array containing unique identifiers (IDs) for each SNP.
221 @property 222 def variants_pos(self) -> Optional[np.ndarray]: 223 """ 224 Retrieve `variants_pos`. 225 226 Returns: 227 **array of shape (n_snps,):** An array containing the chromosomal positions for each SNP. 228 """ 229 return self.__variants_pos
Retrieve variants_pos
.
Returns:
array of shape (n_snps,): An array containing the chromosomal positions for each SNP.
238 @property 239 def variants_qual(self) -> Optional[np.ndarray]: 240 """ 241 Retrieve `variants_qual`. 242 243 Returns: 244 **array of shape (n_snps,):** An array containing the Phred-scaled quality score for each SNP. 245 """ 246 return self.__variants_qual
Retrieve variants_qual
.
Returns:
array of shape (n_snps,): An array containing the Phred-scaled quality score for each SNP.
255 @property 256 def calldata_lai(self) -> Optional[np.ndarray]: 257 """ 258 Retrieve `calldata_lai`. 259 260 Returns: 261 **array of shape (n_snps, n_samples, 2):** 262 An array containing the ancestry for each SNP. 263 """ 264 return self.__calldata_lai
Retrieve calldata_lai
.
Returns:
array of shape (n_snps, n_samples, 2): An array containing the ancestry for each SNP.
273 @property 274 def ancestry_map(self) -> Optional[Dict[str, str]]: 275 """ 276 Retrieve `ancestry_map`. 277 278 Returns: 279 **dict of str to str:** A dictionary mapping ancestry codes to region names. 280 """ 281 return self.__ancestry_map
Retrieve ancestry_map
.
Returns:
dict of str to str: A dictionary mapping ancestry codes to region names.
290 @property 291 def n_samples(self) -> int: 292 """ 293 Retrieve `n_samples`. 294 295 Returns: 296 **int:** The total number of samples. 297 """ 298 if self.__samples is not None: 299 return len(self.__samples) 300 elif self.__calldata_gt is not None: 301 return self.__calldata_gt.shape[1] 302 elif self.__calldata_lai is not None: 303 return self.__calldata_lai.shape[1] 304 else: 305 raise ValueError("Unable to determine the total number of samples: no relevant data is available.")
307 @property 308 def n_snps(self) -> int: 309 """ 310 Retrieve `n_snps`. 311 312 Returns: 313 **int:** The total number of SNPs. 314 """ 315 # List of attributes that can indicate the number of SNPs 316 potential_attributes = [ 317 self.__calldata_gt, 318 self.__variants_ref, 319 self.__variants_alt, 320 self.__variants_chrom, 321 self.__variants_filter_pass, 322 self.__variants_id, 323 self.__variants_pos, 324 self.__variants_qual, 325 self.__calldata_lai 326 ] 327 328 # Check each attribute for its first dimension, which corresponds to `n_snps` 329 for attr in potential_attributes: 330 if attr is not None: 331 return attr.shape[0] 332 333 raise ValueError("Unable to determine the total number of SNPs: no relevant data is available.")
335 @property 336 def n_chrom(self) -> Optional[int]: 337 """ 338 Retrieve `n_chrom`. 339 340 Returns: 341 **int:** The total number of unique chromosomes in `variants_chrom`. 342 """ 343 if self.variants_chrom is None: 344 warnings.warn("Chromosome data `variants_chrom` is None.") 345 return None 346 347 return len(self.unique_chrom)
349 @property 350 def n_ancestries(self) -> int: 351 """ 352 Retrieve `n_ancestries`. 353 354 Returns: 355 **int:** The total number of unique ancestries. 356 """ 357 if self.__calldata_lai is not None: 358 return len(np.unique(self.__calldata_lai)) 359 else: 360 raise ValueError("Unable to determine the total number of ancestries: no relevant data is available.")
362 @property 363 def unique_chrom(self) -> Optional[np.ndarray]: 364 """ 365 Retrieve `unique_chrom`. 366 367 Returns: 368 **array:** The unique chromosome names in `variants_chrom`, preserving their order of appearance. 369 """ 370 if self.variants_chrom is None: 371 warnings.warn("Chromosome data `variants_chrom` is None.") 372 return None 373 374 # Identify unique chromosome names and their first indexes of occurrence 375 _, idx = np.unique(self.variants_chrom, return_index=True) 376 # Return chromosome names sorted by their first occurrence to maintain original order 377 return self.variants_chrom[np.sort(idx)]
Retrieve unique_chrom
.
Returns:
array: The unique chromosome names in
variants_chrom
, preserving their order of appearance.
379 @property 380 def are_strands_summed(self) -> bool: 381 """ 382 Retrieve `are_strands_summed`. 383 384 Returns: 385 **bool:** 386 True if the maternal and paternal strands have been summed together, which is indicated by 387 `calldata_gt` having shape `(n_samples, n_snps)`. False if the strands are stored separately, 388 indicated by `calldata_gt` having shape `(n_samples, n_snps, 2)`. 389 """ 390 if self.calldata_gt is None: 391 warnings.warn("Genotype data `calldata_gt` is None.") 392 return None 393 394 return self.calldata_gt.ndim == 2
Retrieve are_strands_summed
.
Returns:
bool: True if the maternal and paternal strands have been summed together, which is indicated by
calldata_gt
having shape(n_samples, n_snps)
. False if the strands are stored separately, indicated bycalldata_gt
having shape(n_samples, n_snps, 2)
.
396 def copy(self) -> 'SNPObject': 397 """ 398 Create and return a copy of `self`. 399 400 Returns: 401 **SNPObject:** 402 A new instance of the current object. 403 """ 404 return copy.deepcopy(self)
Create and return a copy of self
.
Returns:
SNPObject: A new instance of the current object.
406 def keys(self) -> List[str]: 407 """ 408 Retrieve a list of public attribute names for `self`. 409 410 Returns: 411 **list of str:** 412 A list of attribute names, with internal name-mangling removed, 413 for easier reference to public attributes in the instance. 414 """ 415 return [attr.replace('_SNPObject__', '') for attr in vars(self)]
Retrieve a list of public attribute names for self
.
Returns:
list of str: A list of attribute names, with internal name-mangling removed, for easier reference to public attributes in the instance.
417 def filter_variants( 418 self, 419 chrom: Optional[Union[str, Sequence[str], np.ndarray, None]] = None, 420 pos: Optional[Union[int, Sequence[int], np.ndarray, None]] = None, 421 indexes: Optional[Union[int, Sequence[int], np.ndarray, None]] = None, 422 include: bool = True, 423 inplace: bool = False 424 ) -> Optional['SNPObject']: 425 """ 426 Filter variants based on specified chromosome names, variant positions, or variant indexes. 427 428 This method updates the `calldata_gt`, `variants_ref`, `variants_alt`, 429 `variants_chrom`, `variants_filter_pass`, `variants_id`, `variants_pos`, 430 `variants_qual`, and `lai` attributes to include or exclude the specified variants. The filtering 431 criteria can be based on chromosome names, variant positions, or indexes. If multiple 432 criteria are provided, their union is used for filtering. The order of the variants is preserved. 433 434 Negative indexes are supported and follow 435 [NumPy's indexing conventions](https://numpy.org/doc/stable/user/basics.indexing.html). 436 437 Args: 438 chrom (str or array_like of str, optional): 439 Chromosome(s) to filter variants by. Can be a single chromosome as a string or a sequence 440 of chromosomes. If both `chrom` and `pos` are provided, they must either have matching lengths 441 (pairing each chromosome with a position) or `chrom` should be a single value that applies to 442 all positions in `pos`. Default is None. 443 pos (int or array_like of int, optional): 444 Position(s) to filter variants by. Can be a single position as an integer or a sequence of positions. 445 If `chrom` is also provided, `pos` should either match `chrom` in length or `chrom` should be a 446 single value. Default is None. 447 indexes (int or array_like of int, optional): 448 Index(es) of the variants to include or exclude. Can be a single index or a sequence 449 of indexes. Negative indexes are supported. Default is None. 450 include (bool, default=True): 451 If True, includes only the specified variants. If False, excludes the specified 452 variants. Default is True. 453 inplace (bool, default=False): 454 If True, modifies `self` in place. If False, returns a new `SNPObject` with the variants 455 filtered. Default is False. 456 457 Returns: 458 **Optional[SNPObject]:** 459 A new `SNPObject` with the specified variants filtered if `inplace=False`. 460 If `inplace=True`, modifies `self` in place and returns None. 461 """ 462 if chrom is None and pos is None and indexes is None: 463 raise ValueError("At least one of 'chrom', 'pos', or 'indexes' must be provided.") 464 465 n_snps = self.n_snps 466 467 # Convert inputs to arrays for consistency 468 chrom = np.atleast_1d(chrom) if chrom is not None else None 469 pos = np.atleast_1d(pos) if pos is not None else None 470 indexes = np.atleast_1d(indexes) if indexes is not None else None 471 472 # Validate chrom and pos lengths if both are provided 473 if chrom is not None and pos is not None: 474 if len(chrom) != len(pos) and len(chrom) > 1: 475 raise ValueError( 476 "When both 'chrom' and 'pos' are provided, they must either be of the same length " 477 "or 'chrom' must be a single value." 478 ) 479 480 # Create a mask for chromosome and position filtering 481 mask_combined = np.zeros(n_snps, dtype=bool) 482 if chrom is not None and pos is not None: 483 if len(chrom) == 1: 484 # Apply single chromosome to all positions in `pos` 485 mask_combined = (self['variants_chrom'] == chrom[0]) & np.isin(self['variants_pos'], pos) 486 else: 487 # Vectorized pair matching for chrom and pos 488 query_pairs = np.array( 489 list(zip(chrom, pos)), 490 dtype=[ 491 ('chrom', self['variants_chrom'].dtype), 492 ('pos', self['variants_pos'].dtype) 493 ] 494 ) 495 data_pairs = np.array( 496 list(zip(self['variants_chrom'], self['variants_pos'])), 497 dtype=[ 498 ('chrom', self['variants_chrom'].dtype), 499 ('pos', self['variants_pos'].dtype) 500 ] 501 ) 502 mask_combined = np.isin(data_pairs, query_pairs) 503 504 elif chrom is not None: 505 # Only chromosome filtering 506 mask_combined = np.isin(self['variants_chrom'], chrom) 507 elif pos is not None: 508 # Only position filtering 509 mask_combined = np.isin(self['variants_pos'], pos) 510 511 # Create mask based on indexes if provided 512 if indexes is not None: 513 # Validate indexes, allowing negative indexes 514 out_of_bounds_indexes = indexes[(indexes < -n_snps) | (indexes >= n_snps)] 515 if out_of_bounds_indexes.size > 0: 516 raise ValueError(f"One or more sample indexes are out of bounds.") 517 518 # Handle negative indexes and check for out-of-bounds indexes 519 adjusted_indexes = np.mod(indexes, n_snps) 520 521 # Create mask for specified indexes 522 mask_indexes = np.zeros(n_snps, dtype=bool) 523 mask_indexes[adjusted_indexes] = True 524 525 # Combine with `chrom` and `pos` mask using logical OR (union of all specified criteria) 526 mask_combined = mask_combined | mask_indexes 527 528 # Invert mask if `include` is False 529 if not include: 530 mask_combined = ~mask_combined 531 532 # Define keys to filter 533 keys = [ 534 'calldata_gt', 'variants_ref', 'variants_alt', 'variants_chrom', 'variants_filter_pass', 535 'variants_id', 'variants_pos', 'variants_qual', 'calldata_lai' 536 ] 537 538 # Apply filtering based on inplace parameter 539 if inplace: 540 for key in keys: 541 if self[key] is not None: 542 if self[key].ndim > 1: 543 self[key] = np.asarray(self[key])[mask_combined, ...] 544 else: 545 self[key] = np.asarray(self[key])[mask_combined] 546 547 return None 548 else: 549 # Create A new `SNPObject` with filtered data 550 snpobj = self.copy() 551 for key in keys: 552 if snpobj[key] is not None: 553 if snpobj[key].ndim > 1: 554 snpobj[key] = np.asarray(snpobj[key])[mask_combined, ...] 555 else: 556 snpobj[key] = np.asarray(snpobj[key])[mask_combined] 557 558 return snpobj
Filter variants based on specified chromosome names, variant positions, or variant indexes.
This method updates the calldata_gt
, variants_ref
, variants_alt
,
variants_chrom
, variants_filter_pass
, variants_id
, variants_pos
,
variants_qual
, and lai
attributes to include or exclude the specified variants. The filtering
criteria can be based on chromosome names, variant positions, or indexes. If multiple
criteria are provided, their union is used for filtering. The order of the variants is preserved.
Negative indexes are supported and follow NumPy's indexing conventions.
Arguments:
- chrom (str or array_like of str, optional): Chromosome(s) to filter variants by. Can be a single chromosome as a string or a sequence
of chromosomes. If both
chrom
andpos
are provided, they must either have matching lengths (pairing each chromosome with a position) orchrom
should be a single value that applies to all positions inpos
. Default is None. - pos (int or array_like of int, optional): Position(s) to filter variants by. Can be a single position as an integer or a sequence of positions.
If
chrom
is also provided,pos
should either matchchrom
in length orchrom
should be a single value. Default is None. - indexes (int or array_like of int, optional): Index(es) of the variants to include or exclude. Can be a single index or a sequence of indexes. Negative indexes are supported. Default is None.
- include (bool, default=True): If True, includes only the specified variants. If False, excludes the specified variants. Default is True.
- inplace (bool, default=False): If True, modifies
self
in place. If False, returns a newSNPObject
with the variants filtered. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObject
with the specified variants filtered ifinplace=False
. Ifinplace=True
, modifiesself
in place and returns None.
560 def filter_samples( 561 self, 562 samples: Optional[Union[str, Sequence[str], np.ndarray, None]] = None, 563 indexes: Optional[Union[int, Sequence[int], np.ndarray, None]] = None, 564 include: bool = True, 565 inplace: bool = False 566 ) -> Optional['SNPObject']: 567 """ 568 Filter samples based on specified names or indexes. 569 570 This method updates the `samples` and `calldata_gt` attributes to include or exclude the specified 571 samples. The order of the samples is preserved. 572 573 If both samples and indexes are provided, any sample matching either a name in samples or an index in 574 indexes will be included or excluded. 575 576 This method allows inclusion or exclusion of specific samples by their names or 577 indexes. When both sample names and indexes are provided, the union of the specified samples 578 is used. Negative indexes are supported and follow 579 [NumPy's indexing conventions](https://numpy.org/doc/stable/user/basics.indexing.html). 580 581 Args: 582 samples (str or array_like of str, optional): 583 Name(s) of the samples to include or exclude. Can be a single sample name or a 584 sequence of sample names. Default is None. 585 indexes (int or array_like of int, optional): 586 Index(es) of the samples to include or exclude. Can be a single index or a sequence 587 of indexes. Negative indexes are supported. Default is None. 588 include (bool, default=True): 589 If True, includes only the specified samples. If False, excludes the specified 590 samples. Default is True. 591 inplace (bool, default=False): 592 If True, modifies `self` in place. If False, returns a new `SNPObject` with the samples 593 filtered. Default is False. 594 595 Returns: 596 **Optional[SNPObject]:** 597 A new `SNPObject` with the specified samples filtered if `inplace=False`. 598 If `inplace=True`, modifies `self` in place and returns None. 599 """ 600 if samples is None and indexes is None: 601 raise ValueError("At least one of 'samples' or 'indexes' must be provided.") 602 603 n_samples = self.n_samples 604 sample_names = np.array(self['samples']) 605 606 # Create mask based on sample names 607 if samples is not None: 608 samples = np.atleast_1d(samples) 609 mask_samples = np.isin(sample_names, samples) 610 missing_samples = samples[~np.isin(samples, sample_names)] 611 if missing_samples.size > 0: 612 raise ValueError(f"The following specified samples were not found: {missing_samples.tolist()}") 613 else: 614 mask_samples = np.zeros(n_samples, dtype=bool) 615 616 # Create mask based on sample indexes 617 if indexes is not None: 618 indexes = np.atleast_1d(indexes) 619 620 # Validate indexes, allowing negative indexes 621 out_of_bounds_indexes = indexes[(indexes < -n_samples) | (indexes >= n_samples)] 622 if out_of_bounds_indexes.size > 0: 623 raise ValueError(f"One or more sample indexes are out of bounds.") 624 625 # Handle negative indexes 626 adjusted_indexes = np.mod(indexes, n_samples) 627 628 mask_indexes = np.zeros(n_samples, dtype=bool) 629 mask_indexes[adjusted_indexes] = True 630 else: 631 mask_indexes = np.zeros(n_samples, dtype=bool) 632 633 # Combine masks using logical OR (union of samples) 634 mask_combined = mask_samples | mask_indexes 635 636 if not include: 637 mask_combined = ~mask_combined 638 639 # Define keys to filter 640 keys = ['samples', 'calldata_gt', 'calldata_lai'] 641 642 # Apply filtering based on inplace parameter 643 if inplace: 644 for key in keys: 645 if self[key] is not None: 646 if self[key].ndim > 1: 647 self[key] = np.asarray(self[key])[:, mask_combined, ...] 648 else: 649 self[key] = np.asarray(self[key])[mask_combined] 650 651 return None 652 else: 653 # Create A new `SNPObject` with filtered data 654 snpobj = self.copy() 655 for key in keys: 656 if snpobj[key] is not None: 657 if snpobj[key].ndim > 1: 658 snpobj[key] = np.asarray(snpobj[key])[:, mask_combined, ...] 659 else: 660 snpobj[key] = np.asarray(snpobj[key])[mask_combined] 661 return snpobj
Filter samples based on specified names or indexes.
This method updates the samples
and calldata_gt
attributes to include or exclude the specified
samples. The order of the samples is preserved.
If both samples and indexes are provided, any sample matching either a name in samples or an index in indexes will be included or excluded.
This method allows inclusion or exclusion of specific samples by their names or indexes. When both sample names and indexes are provided, the union of the specified samples is used. Negative indexes are supported and follow NumPy's indexing conventions.
Arguments:
- samples (str or array_like of str, optional): Name(s) of the samples to include or exclude. Can be a single sample name or a sequence of sample names. Default is None.
- indexes (int or array_like of int, optional): Index(es) of the samples to include or exclude. Can be a single index or a sequence of indexes. Negative indexes are supported. Default is None.
- include (bool, default=True): If True, includes only the specified samples. If False, excludes the specified samples. Default is True.
- inplace (bool, default=False): If True, modifies
self
in place. If False, returns a newSNPObject
with the samples filtered. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObject
with the specified samples filtered ifinplace=False
. Ifinplace=True
, modifiesself
in place and returns None.
663 def detect_chromosome_format(self) -> str: 664 """ 665 Detect the chromosome naming convention in `variants_chrom` based on the prefix 666 of the first chromosome identifier in `unique_chrom`. 667 668 **Recognized formats:** 669 670 - `'chr'`: Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'. 671 - `'chm'`: Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'. 672 - `'chrom'`: Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'. 673 - `'plain'`: Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'. 674 675 If the format does not match any recognized pattern, `'Unknown format'` is returned. 676 677 Returns: 678 **str:** 679 A string indicating the detected chromosome format (`'chr'`, `'chm'`, `'chrom'`, or `'plain'`). 680 If no recognized format is matched, returns `'Unknown format'`. 681 """ 682 # Select the first unique chromosome identifier for format detection 683 chromosome_str = self.unique_chrom[0] 684 685 # Define regular expressions to match each recognized chromosome format 686 patterns = { 687 'chr': r'^chr(\d+|X|Y|M)$', # Matches 'chr' prefixed format 688 'chm': r'^chm(\d+|X|Y|M)$', # Matches 'chm' prefixed format 689 'chrom': r'^chrom(\d+|X|Y|M)$', # Matches 'chrom' prefixed format 690 'plain': r'^(\d+|X|Y|M)$' # Matches plain format without prefix 691 } 692 693 # Iterate through the patterns to identify the chromosome format 694 for prefix, pattern in patterns.items(): 695 if re.match(pattern, chromosome_str): 696 return prefix # Return the recognized format prefix 697 698 # If no pattern matches, return 'Unknown format' 699 return 'Unknown format'
Detect the chromosome naming convention in variants_chrom
based on the prefix
of the first chromosome identifier in unique_chrom
.
Recognized formats:
'chr'
: Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'.'chm'
: Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'.'chrom'
: Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'.'plain'
: Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'.
If the format does not match any recognized pattern, 'Unknown format'
is returned.
Returns:
str: A string indicating the detected chromosome format (
'chr'
,'chm'
,'chrom'
, or'plain'
). If no recognized format is matched, returns'Unknown format'
.
701 def convert_chromosome_format( 702 self, 703 from_format: str, 704 to_format: str, 705 inplace: bool = False 706 ) -> Optional['SNPObject']: 707 """ 708 Convert the chromosome format from one naming convention to another in `variants_chrom`. 709 710 **Supported formats:** 711 712 - `'chr'`: Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'. 713 - `'chm'`: Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'. 714 - `'chrom'`: Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'. 715 - `'plain'`: Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'. 716 717 Args: 718 from_format (str): 719 The current chromosome format. Acceptable values are `'chr'`, `'chm'`, `'chrom'`, or `'plain'`. 720 to_format (str): 721 The target format for chromosome data conversion. Acceptable values match `from_format` options. 722 inplace (bool, default=False): 723 If True, modifies `self` in place. If False, returns a new `SNPObject` with the converted format. 724 Default is False. 725 726 Returns: 727 **Optional[SNPObject]:** A new `SNPObject` with the converted chromosome format if `inplace=False`. 728 If `inplace=True`, modifies `self` in place and returns None. 729 """ 730 # Define the list of standard chromosome identifiers 731 chrom_list = [*map(str, range(1, 23)), 'X', 'Y', 'M'] # M for mitochondrial chromosomes 732 733 # Format mappings for different chromosome naming conventions 734 format_mappings = { 735 'chr': [f'chr{i}' for i in chrom_list], 736 'chm': [f'chm{i}' for i in chrom_list], 737 'chrom': [f'chrom{i}' for i in chrom_list], 738 'plain': chrom_list, 739 } 740 741 # Verify that from_format and to_format are valid naming conventions 742 if from_format not in format_mappings or to_format not in format_mappings: 743 raise ValueError(f"Invalid format: {from_format} or {to_format}. Must be one of {list(format_mappings.keys())}.") 744 745 # Convert chromosomes to string for consistent comparison 746 variants_chrom = self['variants_chrom'].astype(str) 747 748 # Verify that all chromosomes in the object follow the specified `from_format` 749 expected_chroms = set(format_mappings[from_format]) 750 mismatched_chroms = set(variants_chrom) - expected_chroms 751 752 if mismatched_chroms: 753 raise ValueError(f"The following chromosomes do not match the `from_format` '{from_format}': {mismatched_chroms}.") 754 755 # Create conditions for selecting based on current `from_format` names 756 conditions = [variants_chrom == chrom for chrom in format_mappings[from_format]] 757 758 # Rename chromosomes based on inplace flag 759 if inplace: 760 self['variants_chrom'] = np.select(conditions, format_mappings[to_format], default='unknown') 761 return None 762 else: 763 snpobject = self.copy() 764 snpobject['variants_chrom'] = np.select(conditions, format_mappings[to_format], default='unknown') 765 return snpobject
Convert the chromosome format from one naming convention to another in variants_chrom
.
Supported formats:
'chr'
: Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'.'chm'
: Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'.'chrom'
: Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'.'plain'
: Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'.
Arguments:
- from_format (str): The current chromosome format. Acceptable values are
'chr'
,'chm'
,'chrom'
, or'plain'
. - to_format (str): The target format for chromosome data conversion. Acceptable values match
from_format
options. - inplace (bool, default=False): If True, modifies
self
in place. If False, returns a newSNPObject
with the converted format. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObject
with the converted chromosome format ifinplace=False
. Ifinplace=True
, modifiesself
in place and returns None.
767 def match_chromosome_format(self, snpobj: 'SNPObject', inplace: bool = False) -> Optional['SNPObject']: 768 """ 769 Convert the chromosome format in `variants_chrom` from `self` to match the format of a reference `snpobj`. 770 771 **Recognized formats:** 772 773 - `'chr'`: Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'. 774 - `'chm'`: Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'. 775 - `'chrom'`: Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'. 776 - `'plain'`: Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'. 777 778 Args: 779 snpobj (SNPObject): 780 The reference SNPObject whose chromosome format will be matched. 781 inplace (bool, default=False): 782 If True, modifies `self` in place. If False, returns a new `SNPObject` with the 783 chromosome format matching that of `snpobj`. Default is False. 784 785 Returns: 786 **Optional[SNPObject]:** 787 A new `SNPObject` with matched chromosome format if `inplace=False`. 788 If `inplace=True`, modifies `self` in place and returns None. 789 """ 790 # Detect the chromosome naming format of the current SNPObject 791 fmt1 = self.detect_chromosome_format() 792 if fmt1 == 'Unknown format': 793 raise ValueError("The chromosome format of the current SNPObject is unrecognized.") 794 795 # Detect the chromosome naming format of the reference SNPObject 796 fmt2 = snpobj.detect_chromosome_format() 797 if fmt2 == 'Unknown format': 798 raise ValueError("The chromosome format of the reference SNPObject is unrecognized.") 799 800 # Convert the current SNPObject's chromosome format to match the reference format 801 return self.convert_chromosome_format(fmt1, fmt2, inplace=inplace)
Convert the chromosome format in variants_chrom
from self
to match the format of a reference snpobj
.
Recognized formats:
'chr'
: Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'.'chm'
: Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'.'chrom'
: Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'.'plain'
: Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'.
Arguments:
- snpobj (SNPObject): The reference SNPObject whose chromosome format will be matched.
- inplace (bool, default=False): If True, modifies
self
in place. If False, returns a newSNPObject
with the chromosome format matching that ofsnpobj
. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObject
with matched chromosome format ifinplace=False
. Ifinplace=True
, modifiesself
in place and returns None.
803 def rename_chrom( 804 self, 805 to_replace: Union[Dict[str, str], str, List[str]] = {'^([0-9]+)$': r'chr\1', r'^chr([0-9]+)$': r'\1'}, 806 value: Optional[Union[str, List[str]]] = None, 807 regex: bool = True, 808 inplace: bool = False 809 ) -> Optional['SNPObject']: 810 """ 811 Replace chromosome values in `variants_chrom` using patterns or exact matches. 812 813 This method allows flexible chromosome replacements, using regex or exact matches, useful 814 for non-standard chromosome formats. For standard conversions (e.g., 'chr1' to '1'), 815 consider `convert_chromosome_format`. 816 817 Args: 818 to_replace (dict, str, or list of str): 819 Pattern(s) or exact value(s) to be replaced in chromosome names. Default behavior 820 transforms `<chrom_num>` to `chr<chrom_num>` or vice versa. Non-matching values 821 remain unchanged. 822 - If str or list of str: Matches will be replaced with `value`. 823 - If regex (bool), then any regex matches will be replaced with `value`. 824 - If dict: Keys defines values to replace, with corresponding replacements as values. 825 value (str or list of str, optional): 826 Replacement value(s) if `to_replace` is a string or list. Ignored if `to_replace` 827 is a dictionary. 828 regex (bool, default=True): 829 If True, interprets `to_replace` keys as regex patterns. 830 inplace (bool, default=False): 831 If True, modifies `self` in place. If False, returns a new `SNPObject` with the chromosomes 832 renamed. Default is False. 833 834 Returns: 835 **Optional[SNPObject]:** A new `SNPObject` with the renamed chromosome format if `inplace=False`. 836 If `inplace=True`, modifies `self` in place and returns None. 837 """ 838 # Standardize input format: convert `to_replace` and `value` to a dictionary if needed 839 if isinstance(to_replace, (str, int)): 840 to_replace = [to_replace] 841 if isinstance(value, (str, int)): 842 value = [value] 843 if isinstance(to_replace, list) and isinstance(value, list): 844 dictionary = dict(zip(to_replace, value)) 845 elif isinstance(to_replace, dict) and value is None: 846 dictionary = to_replace 847 else: 848 raise ValueError( 849 "Invalid input: `to_replace` and `value` must be compatible types (both str, list of str, or dict)." 850 ) 851 852 # Vectorized function for replacing values in chromosome array 853 vec_replace_values = np.vectorize(self._match_to_replace) 854 855 # Rename chromosomes based on inplace flag 856 if inplace: 857 self.variants_chrom = vec_replace_values(self.variants_chrom, dictionary, regex) 858 return None 859 else: 860 snpobj = self.copy() 861 snpobj.variants_chrom = vec_replace_values(self.variants_chrom, dictionary, regex) 862 return snpobj
Replace chromosome values in variants_chrom
using patterns or exact matches.
This method allows flexible chromosome replacements, using regex or exact matches, useful
for non-standard chromosome formats. For standard conversions (e.g., 'chr1' to '1'),
consider convert_chromosome_format
.
Arguments:
- to_replace (dict, str, or list of str): Pattern(s) or exact value(s) to be replaced in chromosome names. Default behavior
transforms
<chrom_num>
tochr<chrom_num>
or vice versa. Non-matching values remain unchanged.- If str or list of str: Matches will be replaced with
value
. - If regex (bool), then any regex matches will be replaced with
value
. - If dict: Keys defines values to replace, with corresponding replacements as values.
- If str or list of str: Matches will be replaced with
- value (str or list of str, optional): Replacement value(s) if
to_replace
is a string or list. Ignored ifto_replace
is a dictionary. - regex (bool, default=True): If True, interprets
to_replace
keys as regex patterns. - inplace (bool, default=False): If True, modifies
self
in place. If False, returns a newSNPObject
with the chromosomes renamed. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObject
with the renamed chromosome format ifinplace=False
. Ifinplace=True
, modifiesself
in place and returns None.
864 def rename_missings( 865 self, 866 before: Union[int, float, str] = -1, 867 after: Union[int, float, str] = '.', 868 inplace: bool = False 869 ) -> Optional['SNPObject']: 870 """ 871 Replace missing values in the `calldata_gt` attribute. 872 873 This method identifies missing values in 'calldata_gt' and replaces them with a specified 874 value. By default, it replaces occurrences of `-1` (often used to signify missing data) with `'.'`. 875 876 Args: 877 before (int, float, or str, default=-1): 878 The current representation of missing values in `calldata_gt`. Common values might be -1, '.', or NaN. 879 Default is -1. 880 after (int, float, or str, default='.'): 881 The value that will replace `before`. Default is '.'. 882 inplace (bool, default=False): 883 If True, modifies `self` in place. If False, returns a new `SNPObject` with the applied 884 replacements. Default is False. 885 886 Returns: 887 **Optional[SNPObject]:** 888 A new `SNPObject` with the renamed missing values if `inplace=False`. 889 If `inplace=True`, modifies `self` in place and returns None. 890 """ 891 # Rename missing values in the `calldata_gt` attribute based on inplace flag 892 if inplace: 893 self['calldata_gt'] = np.where(self['calldata_gt'] == before, after, self['calldata_gt']) 894 return None 895 else: 896 snpobj = self.copy() 897 snpobj['calldata_gt'] = np.where(snpobj['calldata_gt'] == before, after, snpobj['calldata_gt']) 898 return snpobj
Replace missing values in the calldata_gt
attribute.
This method identifies missing values in 'calldata_gt' and replaces them with a specified
value. By default, it replaces occurrences of -1
(often used to signify missing data) with '.'
.
Arguments:
- before (int, float, or str, default=-1): The current representation of missing values in
calldata_gt
. Common values might be -1, '.', or NaN. Default is -1. - after (int, float, or str, default='.'): The value that will replace
before
. Default is '.'. - inplace (bool, default=False): If True, modifies
self
in place. If False, returns a newSNPObject
with the applied replacements. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObject
with the renamed missing values ifinplace=False
. Ifinplace=True
, modifiesself
in place and returns None.
900 def get_common_variants_intersection( 901 self, 902 snpobj: 'SNPObject', 903 index_by: str = 'pos' 904 ) -> Tuple[List[str], np.ndarray, np.ndarray]: 905 """ 906 Identify common variants between `self` and the `snpobj` instance based on the specified `index_by` criterion, 907 which may match based on chromosome and position (`variants_chrom`, `variants_pos`), ID (`variants_id`), or both. 908 909 This method returns the identifiers of common variants and their corresponding indices in both objects. 910 911 Args: 912 snpobj (SNPObject): 913 The reference SNPObject to compare against. 914 index_by (str, default='pos'): 915 Criteria for matching variants. Options: 916 - `'pos'`: Matches by chromosome and position (`variants_chrom`, `variants_pos`), e.g., 'chr1-12345'. 917 - `'id'`: Matches by variant ID alone (`variants_id`), e.g., 'rs123'. 918 - `'pos+id'`: Matches by chromosome, position, and ID (`variants_chrom`, `variants_pos`, `variants_id`), e.g., 'chr1-12345-rs123'. 919 Default is 'pos'. 920 921 Returns: 922 Tuple containing: 923 - **list of str:** A list of common variant identifiers (as strings). 924 - **array:** An array of indices in `self` where common variants are located. 925 - **array:** An array of indices in `snpobj` where common variants are located. 926 """ 927 # Create unique identifiers for each variant in both SNPObjects based on the specified criterion 928 if index_by == 'pos': 929 query_identifiers = [f"{chrom}-{pos}" for chrom, pos in zip(self['variants_chrom'], self['variants_pos'])] 930 reference_identifiers = [f"{chrom}-{pos}" for chrom, pos in zip(snpobj['variants_chrom'], snpobj['variants_pos'])] 931 elif index_by == 'id': 932 query_identifiers = self['variants_id'].tolist() 933 reference_identifiers = snpobj['variants_id'].tolist() 934 elif index_by == 'pos+id': 935 query_identifiers = [ 936 f"{chrom}-{pos}-{ids}" for chrom, pos, ids in zip(self['variants_chrom'], self['variants_pos'], self['variants_id']) 937 ] 938 reference_identifiers = [ 939 f"{chrom}-{pos}-{ids}" for chrom, pos, ids in zip(snpobj['variants_chrom'], snpobj['variants_pos'], snpobj['variants_id']) 940 ] 941 else: 942 raise ValueError("`index_by` must be one of 'pos', 'id', or 'pos+id'.") 943 944 # Convert to sets for intersection 945 common_ids = set(query_identifiers).intersection(reference_identifiers) 946 947 # Collect indices for common identifiers 948 query_idx = [i for i, id in enumerate(query_identifiers) if id in common_ids] 949 reference_idx = [i for i, id in enumerate(reference_identifiers) if id in common_ids] 950 951 return list(common_ids), np.array(query_idx), np.array(reference_idx)
Identify common variants between self
and the snpobj
instance based on the specified index_by
criterion,
which may match based on chromosome and position (variants_chrom
, variants_pos
), ID (variants_id
), or both.
This method returns the identifiers of common variants and their corresponding indices in both objects.
Arguments:
- snpobj (SNPObject): The reference SNPObject to compare against.
- index_by (str, default='pos'): Criteria for matching variants. Options:
'pos'
: Matches by chromosome and position (variants_chrom
,variants_pos
), e.g., 'chr1-12345'.'id'
: Matches by variant ID alone (variants_id
), e.g., 'rs123'.'pos+id'
: Matches by chromosome, position, and ID (variants_chrom
,variants_pos
,variants_id
), e.g., 'chr1-12345-rs123'. Default is 'pos'.
Returns:
Tuple containing:
- list of str: A list of common variant identifiers (as strings).
- array: An array of indices in
self
where common variants are located.- array: An array of indices in
snpobj
where common variants are located.
953 def get_common_markers_intersection( 954 self, 955 snpobj: 'SNPObject' 956 ) -> Tuple[List[str], np.ndarray, np.ndarray]: 957 """ 958 Identify common markers between between `self` and the `snpobj` instance. Common markers are identified 959 based on matching chromosome (`variants_chrom`), position (`variants_pos`), reference (`variants_ref`), 960 and alternate (`variants_alt`) alleles. 961 962 This method returns the identifiers of common markers and their corresponding indices in both objects. 963 964 Args: 965 snpobj (SNPObject): 966 The reference SNPObject to compare against. 967 968 Returns: 969 Tuple containing: 970 - **list of str:** A list of common variant identifiers (as strings). 971 - **array:** An array of indices in `self` where common variants are located. 972 - **array:** An array of indices in `snpobj` where common variants are located. 973 """ 974 # Generate unique identifiers based on chrom, pos, ref, and alt alleles 975 query_identifiers = [ 976 f"{chrom}-{pos}-{ref}-{alt}" for chrom, pos, ref, alt in 977 zip(self['variants_chrom'], self['variants_pos'], self['variants_ref'], self['variants_alt']) 978 ] 979 reference_identifiers = [ 980 f"{chrom}-{pos}-{ref}-{alt}" for chrom, pos, ref, alt in 981 zip(snpobj['variants_chrom'], snpobj['variants_pos'], snpobj['variants_ref'], snpobj['variants_alt']) 982 ] 983 984 # Convert to sets for intersection 985 common_ids = set(query_identifiers).intersection(reference_identifiers) 986 987 # Collect indices for common identifiers in both SNPObjects 988 query_idx = [i for i, id in enumerate(query_identifiers) if id in common_ids] 989 reference_idx = [i for i, id in enumerate(reference_identifiers) if id in common_ids] 990 991 return list(common_ids), np.array(query_idx), np.array(reference_idx)
Identify common markers between between self
and the snpobj
instance. Common markers are identified
based on matching chromosome (variants_chrom
), position (variants_pos
), reference (variants_ref
),
and alternate (variants_alt
) alleles.
This method returns the identifiers of common markers and their corresponding indices in both objects.
Arguments:
- snpobj (SNPObject): The reference SNPObject to compare against.
Returns:
Tuple containing:
- list of str: A list of common variant identifiers (as strings).
- array: An array of indices in
self
where common variants are located.- array: An array of indices in
snpobj
where common variants are located.
993 def subset_to_common_variants( 994 self, 995 snpobj: 'SNPObject', 996 index_by: str = 'pos', 997 common_variants_intersection: Optional[Tuple[np.ndarray, np.ndarray]] = None, 998 inplace: bool = False 999 ) -> Optional['SNPObject']: 1000 """ 1001 Subset `self` to include only the common variants with a reference `snpobj` based on 1002 the specified `index_by` criterion, which may match based on chromosome and position 1003 (`variants_chrom`, `variants_pos`), ID (`variants_id`), or both. 1004 1005 Args: 1006 snpobj (SNPObject): 1007 The reference SNPObject to compare against. 1008 index_by (str, default='pos'): 1009 Criteria for matching variants. Options: 1010 - `'pos'`: Matches by chromosome and position (`variants_chrom`, `variants_pos`), e.g., 'chr1-12345'. 1011 - `'id'`: Matches by variant ID alone (`variants_id`), e.g., 'rs123'. 1012 - `'pos+id'`: Matches by chromosome, position, and ID (`variants_chrom`, `variants_pos`, `variants_id`), e.g., 'chr1-12345-rs123'. 1013 Default is 'pos'. 1014 common_variants_intersection (Tuple[np.ndarray, np.ndarray], optional): 1015 Precomputed indices of common variants between `self` and `snpobj`. If None, intersection is 1016 computed within the function. 1017 inplace (bool, default=False): 1018 If True, modifies `self` in place. If False, returns a new `SNPObject` with the common variants 1019 subsetted. Default is False. 1020 1021 Returns: 1022 **Optional[SNPObject]:** 1023 Optional[SNPObject]: A new `SNPObject` with the common variants subsetted if `inplace=False`. 1024 If `inplace=True`, modifies `self` in place and returns None. 1025 """ 1026 # Get indices of common variants if not provided 1027 if common_variants_intersection is None: 1028 _, query_idx, _ = self.get_common_variants_intersection(snpobj, index_by=index_by) 1029 else: 1030 query_idx, _ = common_variants_intersection 1031 1032 # Use filter_variants method with the identified indices, applying `inplace` as specified 1033 return self.filter_variants(indexes=query_idx, include=True, inplace=inplace)
Subset self
to include only the common variants with a reference snpobj
based on
the specified index_by
criterion, which may match based on chromosome and position
(variants_chrom
, variants_pos
), ID (variants_id
), or both.
Arguments:
- snpobj (SNPObject): The reference SNPObject to compare against.
- index_by (str, default='pos'): Criteria for matching variants. Options:
'pos'
: Matches by chromosome and position (variants_chrom
,variants_pos
), e.g., 'chr1-12345'.'id'
: Matches by variant ID alone (variants_id
), e.g., 'rs123'.'pos+id'
: Matches by chromosome, position, and ID (variants_chrom
,variants_pos
,variants_id
), e.g., 'chr1-12345-rs123'. Default is 'pos'.
- common_variants_intersection (Tuple[np.ndarray, np.ndarray], optional): Precomputed indices of common variants between
self
andsnpobj
. If None, intersection is computed within the function. - inplace (bool, default=False): If True, modifies
self
in place. If False, returns a newSNPObject
with the common variants subsetted. Default is False.
Returns:
Optional[SNPObject]: Optional[SNPObject]: A new
SNPObject
with the common variants subsetted ifinplace=False
. Ifinplace=True
, modifiesself
in place and returns None.
1035 def subset_to_common_markers( 1036 self, 1037 snpobj: 'SNPObject', 1038 common_markers_intersection: Optional[Tuple[np.ndarray, np.ndarray]] = None, 1039 inplace: bool = False 1040 ) -> Optional['SNPObject']: 1041 """ 1042 Subset `self` to include only the common markers with a reference `snpobj`. Common markers are identified 1043 based on matching chromosome (`variants_chrom`), position (`variants_pos`), reference (`variants_ref`), 1044 and alternate (`variants_alt`) alleles. 1045 1046 Args: 1047 snpobj (SNPObject): 1048 The reference SNPObject to compare against. 1049 common_markers_intersection (tuple of arrays, optional): 1050 Precomputed indices of common markers between `self` and `snpobj`. If None, intersection is 1051 computed within the function. 1052 inplace (bool, default=False): 1053 If True, modifies `self` in place. If False, returns a new `SNPObject` with the common markers 1054 subsetted. Default is False. 1055 1056 Returns: 1057 **SNPObject:** 1058 **Optional[SNPObject]:** A new `SNPObject` with the common markers subsetted if `inplace=False`. 1059 If `inplace=True`, modifies `self` in place and returns None. 1060 """ 1061 # Get indices of common markers if not provided 1062 if common_markers_intersection is None: 1063 _, query_idx, _ = self.get_common_markers_intersection(snpobj) 1064 else: 1065 query_idx, _ = common_markers_intersection 1066 1067 # Use filter_variants method with the identified indices, applying `inplace` as specified 1068 return self.filter_variants(indexes=query_idx, include=True, inplace=inplace)
Subset self
to include only the common markers with a reference snpobj
. Common markers are identified
based on matching chromosome (variants_chrom
), position (variants_pos
), reference (variants_ref
),
and alternate (variants_alt
) alleles.
Arguments:
- snpobj (SNPObject): The reference SNPObject to compare against.
- common_markers_intersection (tuple of arrays, optional): Precomputed indices of common markers between
self
andsnpobj
. If None, intersection is computed within the function. - inplace (bool, default=False): If True, modifies
self
in place. If False, returns a newSNPObject
with the common markers subsetted. Default is False.
Returns:
SNPObject: Optional[SNPObject]: A new
SNPObject
with the common markers subsetted ifinplace=False
. Ifinplace=True
, modifiesself
in place and returns None.
1070 def remove_strand_ambiguous_variants(self, inplace: bool = False) -> Optional['SNPObject']: 1071 """ 1072 A strand-ambiguous variant has reference (`variants_ref`) and alternate (`variants_alt`) alleles 1073 in the pairs A/T, T/A, C/G, or G/C, where both alleles are complementary and thus indistinguishable 1074 in terms of strand orientation. 1075 1076 Args: 1077 inplace (bool, default=False): 1078 If True, modifies `self` in place. If False, returns a new `SNPObject` with the 1079 strand-ambiguous variants removed. Default is False. 1080 1081 Returns: 1082 **Optional[SNPObject]:** A new `SNPObject` with non-ambiguous variants only if `inplace=False`. 1083 If `inplace=True`, modifies `self` in place and returns None. 1084 """ 1085 # Identify strand-ambiguous SNPs using vectorized comparisons 1086 is_AT = (self['variants_ref'] == 'A') & (self['variants_alt'] == 'T') 1087 is_TA = (self['variants_ref'] == 'T') & (self['variants_alt'] == 'A') 1088 is_CG = (self['variants_ref'] == 'C') & (self['variants_alt'] == 'G') 1089 is_GC = (self['variants_ref'] == 'G') & (self['variants_alt'] == 'C') 1090 1091 # Create a combined mask for all ambiguous variants 1092 ambiguous_mask = is_AT | is_TA | is_CG | is_GC 1093 non_ambiguous_idx = np.where(~ambiguous_mask)[0] 1094 1095 # Count each type of ambiguity using numpy's sum on boolean arrays 1096 A_T_count = np.sum(is_AT) 1097 T_A_count = np.sum(is_TA) 1098 C_G_count = np.sum(is_CG) 1099 G_C_count = np.sum(is_GC) 1100 1101 # Log the counts of each type of strand-ambiguous variants 1102 total_ambiguous = A_T_count + T_A_count + C_G_count + G_C_count 1103 log.info(f'{A_T_count} ambiguities of A-T type.') 1104 log.info(f'{T_A_count} ambiguities of T-A type.') 1105 log.info(f'{C_G_count} ambiguities of C-G type.') 1106 log.info(f'{G_C_count} ambiguities of G-C type.') 1107 1108 # Filter out ambiguous variants and keep non-ambiguous ones 1109 log.debug(f'Removing {total_ambiguous} strand-ambiguous variants...') 1110 return self.filter_variants(indexes=non_ambiguous_idx, include=True, inplace=inplace)
A strand-ambiguous variant has reference (variants_ref
) and alternate (variants_alt
) alleles
in the pairs A/T, T/A, C/G, or G/C, where both alleles are complementary and thus indistinguishable
in terms of strand orientation.
Arguments:
- inplace (bool, default=False): If True, modifies
self
in place. If False, returns a newSNPObject
with the strand-ambiguous variants removed. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObject
with non-ambiguous variants only ifinplace=False
. Ifinplace=True
, modifiesself
in place and returns None.
1112 def correct_flipped_variants( 1113 self, 1114 snpobj: 'SNPObject', 1115 check_complement: bool = True, 1116 index_by: str = 'pos', 1117 common_variants_intersection: Optional[Tuple[np.ndarray, np.ndarray]] = None, 1118 log_stats: bool = True, 1119 inplace: bool = False 1120 ) -> Optional['SNPObject']: 1121 """ 1122 Correct flipped variants between between `self` and a reference `snpobj`, where reference (`variants_ref`) 1123 and alternate (`variants_alt`) alleles are swapped. 1124 1125 **Flip Detection Based on `check_complement`:** 1126 1127 - If `check_complement=False`, only direct allele swaps are considered: 1128 1. **Direct Swap:** `self.variants_ref == snpobj.variants_alt` and `self.variants_alt == snpobj.variants_ref`. 1129 1130 - If `check_complement=True`, both direct and complementary swaps are considered, with four possible cases: 1131 1. **Direct Swap:** `self.variants_ref == snpobj.variants_alt` and `self.variants_alt == snpobj.variants_ref`. 1132 2. **Complement Swap of Ref:** `complement(self.variants_ref) == snpobj.variants_alt` and `self.variants_alt == snpobj.variants_ref`. 1133 3. **Complement Swap of Alt:** `self.variants_ref == snpobj.variants_alt` and `complement(self.variants_alt) == snpobj.variants_ref`. 1134 4. **Complement Swap of both Ref and Alt:** `complement(self.variants_ref) == snpobj.variants_alt` and `complement(self.variants_alt) == snpobj.variants_ref`. 1135 1136 **Note:** Variants where `self.variants_ref == self.variants_alt` are ignored as they are ambiguous. 1137 1138 **Correction Process:** 1139 - Swaps `variants_ref` and `variants_alt` alleles in `self` to align with `snpobj`. 1140 - Flips `calldata_gt` values (0 becomes 1, and 1 becomes 0) to match the updated allele configuration. 1141 1142 Args: 1143 snpobj (SNPObject): 1144 The reference SNPObject to compare against. 1145 check_complement (bool, default=True): 1146 If True, also checks for complementary base pairs (A/T, T/A, C/G, and G/C) when identifying swapped variants. 1147 Default is True. 1148 index_by (str, default='pos'): 1149 Criteria for matching variants. Options: 1150 - `'pos'`: Matches by chromosome and position (`variants_chrom`, `variants_pos`), e.g., 'chr1-12345'. 1151 - `'id'`: Matches by variant ID alone (`variants_id`), e.g., 'rs123'. 1152 - `'pos+id'`: Matches by chromosome, position, and ID (`variants_chrom`, `variants_pos`, `variants_id`), e.g., 'chr1-12345-rs123'. 1153 Default is 'pos'. 1154 common_variants_intersection (tuple of arrays, optional): 1155 Precomputed indices of common variants between `self` and `snpobj`. If None, intersection is 1156 computed within the function. 1157 log_stats (bool, default=True): 1158 If True, logs statistical information about matching and ambiguous alleles. Default is True. 1159 inplace (bool, default=False): 1160 If True, modifies `self` in place. If False, returns a new `SNPObject` with corrected 1161 flips. Default is False. 1162 1163 Returns: 1164 **Optional[SNPObject]**: 1165 A new `SNPObject` with corrected flips if `inplace=False`. 1166 If `inplace=True`, modifies `self` in place and returns None. 1167 """ 1168 # Define complement mappings for nucleotides 1169 complement_map = {'A': 'T', 'T': 'A', 'C': 'G', 'G': 'C'} 1170 1171 # Helper function to get the complement of a base 1172 def get_complement(base: str) -> str: 1173 return complement_map.get(base, base) 1174 1175 # Get common variant indices if not provided 1176 if common_variants_intersection != None: 1177 query_idx, reference_idx = common_variants_intersection 1178 else: 1179 _, query_idx, reference_idx = self.get_common_variants_intersection(snpobj, index_by=index_by) 1180 1181 # Log statistics on matching alleles if enabled 1182 if log_stats: 1183 matching_ref = np.sum(self['variants_ref'][query_idx] == snpobj['variants_ref'][reference_idx]) 1184 matching_alt = np.sum(self['variants_alt'][query_idx] == snpobj['variants_alt'][reference_idx]) 1185 ambiguous = np.sum(self['variants_ref'][query_idx] == self['variants_alt'][query_idx]) 1186 log.info(f"Matching reference alleles (ref=ref'): {matching_ref}, Matching alternate alleles (alt=alt'): {matching_alt}.") 1187 log.info(f"Number of ambiguous alleles (ref=alt): {ambiguous}.") 1188 1189 # Identify indices where `ref` and `alt` alleles are swapped 1190 if not check_complement: 1191 # Simple exact match for swapped alleles 1192 swapped_ref = (self['variants_ref'][query_idx] == snpobj['variants_alt'][reference_idx]) 1193 swapped_alt = (self['variants_alt'][query_idx] == snpobj['variants_ref'][reference_idx]) 1194 else: 1195 # Check for swapped or complementary-swapped alleles 1196 swapped_ref = ( 1197 (self['variants_ref'][query_idx] == snpobj['variants_alt'][reference_idx]) | 1198 (np.vectorize(get_complement)(self['variants_ref'][query_idx]) == snpobj['variants_alt'][reference_idx]) 1199 ) 1200 swapped_alt = ( 1201 (self['variants_alt'][query_idx] == snpobj['variants_ref'][reference_idx]) | 1202 (np.vectorize(get_complement)(self['variants_alt'][query_idx]) == snpobj['variants_ref'][reference_idx]) 1203 ) 1204 1205 # Filter out ambiguous variants where `ref` and `alt` alleles match (ref=alt) 1206 not_ambiguous = (self['variants_ref'][query_idx] != self['variants_alt'][query_idx]) 1207 1208 # Indices in `self` of flipped variants 1209 flip_idx_query = query_idx[swapped_ref & swapped_alt & not_ambiguous] 1210 1211 # Correct the identified variant flips 1212 if len(flip_idx_query) > 0: 1213 log.info(f'Correcting {len(flip_idx_query)} variant flips...') 1214 1215 temp_alts = self['variants_alt'][flip_idx_query] 1216 temp_refs = self['variants_ref'][flip_idx_query] 1217 1218 # Correct the variant flips based on whether the operation is in-place or not 1219 if inplace: 1220 self['variants_alt'][flip_idx_query] = temp_refs 1221 self['variants_ref'][flip_idx_query] = temp_alts 1222 self['calldata_gt'][flip_idx_query] = 1 - self['calldata_gt'][flip_idx_query] 1223 return None 1224 else: 1225 snpobj = self.copy() 1226 snpobj['variants_alt'][flip_idx_query] = temp_refs 1227 snpobj['variants_ref'][flip_idx_query] = temp_alts 1228 snpobj['calldata_gt'][flip_idx_query] = 1 - snpobj['calldata_gt'][flip_idx_query] 1229 return snpobj 1230 else: 1231 log.info('No variant flips found to correct.') 1232 return self if not inplace else None
Correct flipped variants between between self
and a reference snpobj
, where reference (variants_ref
)
and alternate (variants_alt
) alleles are swapped.
Flip Detection Based on check_complement
:
If
check_complement=False
, only direct allele swaps are considered:- Direct Swap:
self.variants_ref == snpobj.variants_alt
andself.variants_alt == snpobj.variants_ref
.
- Direct Swap:
If
check_complement=True
, both direct and complementary swaps are considered, with four possible cases:- Direct Swap:
self.variants_ref == snpobj.variants_alt
andself.variants_alt == snpobj.variants_ref
. - Complement Swap of Ref:
complement(self.variants_ref) == snpobj.variants_alt
andself.variants_alt == snpobj.variants_ref
. - Complement Swap of Alt:
self.variants_ref == snpobj.variants_alt
andcomplement(self.variants_alt) == snpobj.variants_ref
. - Complement Swap of both Ref and Alt:
complement(self.variants_ref) == snpobj.variants_alt
andcomplement(self.variants_alt) == snpobj.variants_ref
.
- Direct Swap:
Note: Variants where self.variants_ref == self.variants_alt
are ignored as they are ambiguous.
Correction Process:
- Swaps
variants_ref
andvariants_alt
alleles inself
to align withsnpobj
. - Flips
calldata_gt
values (0 becomes 1, and 1 becomes 0) to match the updated allele configuration.
Arguments:
- snpobj (SNPObject): The reference SNPObject to compare against.
- check_complement (bool, default=True): If True, also checks for complementary base pairs (A/T, T/A, C/G, and G/C) when identifying swapped variants. Default is True.
- index_by (str, default='pos'): Criteria for matching variants. Options:
'pos'
: Matches by chromosome and position (variants_chrom
,variants_pos
), e.g., 'chr1-12345'.'id'
: Matches by variant ID alone (variants_id
), e.g., 'rs123'.'pos+id'
: Matches by chromosome, position, and ID (variants_chrom
,variants_pos
,variants_id
), e.g., 'chr1-12345-rs123'. Default is 'pos'.
- common_variants_intersection (tuple of arrays, optional): Precomputed indices of common variants between
self
andsnpobj
. If None, intersection is computed within the function. - log_stats (bool, default=True): If True, logs statistical information about matching and ambiguous alleles. Default is True.
- inplace (bool, default=False): If True, modifies
self
in place. If False, returns a newSNPObject
with corrected flips. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObject
with corrected flips ifinplace=False
. Ifinplace=True
, modifiesself
in place and returns None.
1234 def remove_mismatching_variants( 1235 self, 1236 snpobj: 'SNPObject', 1237 index_by: str = 'pos', 1238 common_variants_intersection: Optional[Tuple[np.ndarray, np.ndarray]] = None, 1239 inplace: bool = False 1240 ) -> Optional['SNPObject']: 1241 """ 1242 Remove variants from `self`, where reference (`variants_ref`) and/or alternate (`variants_alt`) alleles 1243 do not match with a reference `snpobj`. 1244 1245 Args: 1246 snpobj (SNPObject): 1247 The reference SNPObject to compare against. 1248 index_by (str, default='pos'): 1249 Criteria for matching variants. Options: 1250 - `'pos'`: Matches by chromosome and position (`variants_chrom`, `variants_pos`), e.g., 'chr1-12345'. 1251 - `'id'`: Matches by variant ID alone (`variants_id`), e.g., 'rs123'. 1252 - `'pos+id'`: Matches by chromosome, position, and ID (`variants_chrom`, `variants_pos`, `variants_id`), e.g., 'chr1-12345-rs123'. 1253 Default is 'pos'. 1254 common_variants_intersection (tuple of arrays, optional): 1255 Precomputed indices of common variants between `self` and the reference `snpobj`. 1256 If None, the intersection is computed within the function. 1257 inplace (bool, default=False): 1258 If True, modifies `self` in place. If False, returns a new `SNPObject` without 1259 mismatching variants. Default is False. 1260 1261 Returns: 1262 **Optional[SNPObject]:** 1263 A new `SNPObject` without mismatching variants if `inplace=False`. 1264 If `inplace=True`, modifies `self` in place and returns None. 1265 """ 1266 # Get common variant indices if not provided 1267 if common_variants_intersection is not None: 1268 query_idx, reference_idx = common_variants_intersection 1269 else: 1270 _, query_idx, reference_idx = self.get_common_variants_intersection(snpobj, index_by=index_by) 1271 1272 # Vectorized comparison of `ref` and `alt` alleles 1273 ref_mismatch = self['variants_ref'][query_idx] != snpobj['variants_ref'][reference_idx] 1274 alt_mismatch = self['variants_alt'][query_idx] != snpobj['variants_alt'][reference_idx] 1275 mismatch_mask = ref_mismatch | alt_mismatch 1276 1277 # Identify indices in `self` of mismatching variants 1278 mismatch_idx = query_idx[mismatch_mask] 1279 1280 # Compute total number of variant mismatches 1281 total_mismatches = np.sum(mismatch_mask) 1282 1283 # Filter out mismatching variants 1284 log.debug(f'Removing {total_mismatches} mismatching variants...') 1285 return self.filter_variants(indexes=mismatch_idx, include=True, inplace=inplace)
Remove variants from self
, where reference (variants_ref
) and/or alternate (variants_alt
) alleles
do not match with a reference snpobj
.
Arguments:
- snpobj (SNPObject): The reference SNPObject to compare against.
- index_by (str, default='pos'): Criteria for matching variants. Options:
'pos'
: Matches by chromosome and position (variants_chrom
,variants_pos
), e.g., 'chr1-12345'.'id'
: Matches by variant ID alone (variants_id
), e.g., 'rs123'.'pos+id'
: Matches by chromosome, position, and ID (variants_chrom
,variants_pos
,variants_id
), e.g., 'chr1-12345-rs123'. Default is 'pos'.
- common_variants_intersection (tuple of arrays, optional): Precomputed indices of common variants between
self
and the referencesnpobj
. If None, the intersection is computed within the function. - inplace (bool, default=False): If True, modifies
self
in place. If False, returns a newSNPObject
without mismatching variants. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObject
without mismatching variants ifinplace=False
. Ifinplace=True
, modifiesself
in place and returns None.
1287 def shuffle_variants(self, inplace: bool = False) -> Optional['SNPObject']: 1288 """ 1289 Randomly shuffle the positions of variants in the SNPObject, ensuring that all associated 1290 data (e.g., `calldata_gt` and variant-specific attributes) remain aligned. 1291 1292 Args: 1293 inplace (bool, default=False): 1294 If True, modifies `self` in place. If False, returns a new `SNPObject` with 1295 shuffled variants. Default is False. 1296 1297 Returns: 1298 **Optional[SNPObject]:** 1299 A new `SNPObject` without shuffled variant positions if `inplace=False`. 1300 If `inplace=True`, modifies `self` in place and returns None. 1301 """ 1302 # Generate a random permutation index for shuffling variant positions 1303 shuffle_index = np.random.permutation(self.n_snps) 1304 1305 # Apply shuffling to all relevant attributes using the class's dictionary-like interface 1306 if inplace: 1307 for key in self.keys(): 1308 if self[key] is not None: 1309 if key == 'calldata_gt': 1310 # `calldata_gt`` has a different shape, so it's shuffled along axis 0 1311 self[key] = self[key][shuffle_index, ...] 1312 elif 'variant' in key: 1313 # Other attributes are 1D arrays 1314 self[key] = np.asarray(self[key])[shuffle_index] 1315 return None 1316 else: 1317 shuffled_snpobj = self.copy() 1318 for key in shuffled_snpobj.keys(): 1319 if shuffled_snpobj[key] is not None: 1320 if key == 'calldata_gt': 1321 shuffled_snpobj[key] = shuffled_snpobj[key][shuffle_index, ...] 1322 elif 'variant' in key: 1323 shuffled_snpobj[key] = np.asarray(shuffled_snpobj[key])[shuffle_index] 1324 return shuffled_snpobj
Randomly shuffle the positions of variants in the SNPObject, ensuring that all associated
data (e.g., calldata_gt
and variant-specific attributes) remain aligned.
Arguments:
- inplace (bool, default=False): If True, modifies
self
in place. If False, returns a newSNPObject
with shuffled variants. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObject
without shuffled variant positions ifinplace=False
. Ifinplace=True
, modifiesself
in place and returns None.
1326 def set_empty_to_missing(self, inplace: bool = False) -> Optional['SNPObject']: 1327 """ 1328 Replace empty strings `''` with missing values `'.'` in attributes of `self`. 1329 1330 Args: 1331 inplace (bool, default=False): 1332 If True, modifies `self` in place. If False, returns a new `SNPObject` with empty 1333 strings `''` replaced by missing values `'.'`. Default is False. 1334 1335 Returns: 1336 **Optional[SNPObject]:** 1337 A new `SNPObject` with empty strings replaced if `inplace=False`. 1338 If `inplace=True`, modifies `self` in place and returns None. 1339 """ 1340 if inplace: 1341 if self.variants_alt is not None: 1342 self.variants_alt[self.variants_alt == ''] = '.' 1343 if self.variants_ref is not None: 1344 self.variants_ref[self.variants_ref == ''] = '.' 1345 if self.variants_qual is not None: 1346 self.variants_qual = self.variants_qual.astype(str) 1347 self.variants_qual[(self.variants_qual == '') | (self.variants_qual == 'nan')] = '.' 1348 if self.variants_chrom is not None: 1349 self.variants_chrom = self.variants_chrom.astype(str) 1350 self.variants_chrom[self.variants_chrom == ''] = '.' 1351 if self.variants_filter_pass is not None: 1352 self.variants_filter_pass[self.variants_filter_pass == ''] = '.' 1353 if self.variants_id is not None: 1354 self.variants_id[self.variants_id == ''] = '.' 1355 return self 1356 else: 1357 snpobj = self.copy() 1358 if snpobj.variants_alt is not None: 1359 snpobj.variants_alt[snpobj.variants_alt == ''] = '.' 1360 if snpobj.variants_ref is not None: 1361 snpobj.variants_ref[snpobj.variants_ref == ''] = '.' 1362 if snpobj.variants_qual is not None: 1363 snpobj.variants_qual = snpobj.variants_qual.astype(str) 1364 snpobj.variants_qual[(snpobj.variants_qual == '') | (snpobj.variants_qual == 'nan')] = '.' 1365 if snpobj.variants_chrom is not None: 1366 snpobj.variants_chrom[snpobj.variants_chrom == ''] = '.' 1367 if snpobj.variants_filter_pass is not None: 1368 snpobj.variants_filter_pass[snpobj.variants_filter_pass == ''] = '.' 1369 if snpobj.variants_id is not None: 1370 snpobj.variants_id[snpobj.variants_id == ''] = '.' 1371 return snpobj
Replace empty strings ''
with missing values '.'
in attributes of self
.
Arguments:
- inplace (bool, default=False): If True, modifies
self
in place. If False, returns a newSNPObject
with empty strings''
replaced by missing values'.'
. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObject
with empty strings replaced ifinplace=False
. Ifinplace=True
, modifiesself
in place and returns None.
1373 def convert_to_window_level( 1374 self, 1375 window_size: Optional[int] = None, 1376 physical_pos: Optional[np.ndarray] = None, 1377 chromosomes: Optional[np.ndarray] = None, 1378 window_sizes: Optional[np.ndarray] = None, 1379 laiobj: Optional['LocalAncestryObject'] = None 1380 ) -> 'LocalAncestryObject': 1381 """ 1382 Aggregate the `calldata_lai` attribute into genomic windows within a 1383 `snputils.ancestry.genobj.LocalAncestryObject`. 1384 1385 **Options for defining windows (in order of precedence):** 1386 1387 1. **Fixed window size**: 1388 - Use `window_size` to specify how many SNPs go into each window. The last window on each 1389 chromosome may be larger if SNPs are not evenly divisible by the size. 1390 1391 2. **Custom start and end positions**: 1392 - Provide `physical_pos` (2D array of shape (n_windows, 2)) as the [start, end] base-pair 1393 coordinates for each window. 1394 - If `chromosomes` is not provided and `self` has exactly one chromosome, all windows are 1395 assumed to belong to that chromosome. 1396 - If multiple chromosomes exist but `chromosomes` is missing, an error will be raised. 1397 - Optionally, provide `window_sizes` to store the SNP count per-window. 1398 1399 3. **Matching existing windows**: 1400 - Reuse window definitions (`physical_pos`, `chromosomes`, `window_sizes`) from an existing `laiobj`. 1401 1402 Args: 1403 window_size (int, optional): 1404 Number of SNPs in each window if defining fixed-size windows. If the total number of 1405 SNPs in a chromosome is not evenly divisible by the window size, the last window on that 1406 chromosome will include all remaining SNPs and therefore be larger than the specified size. 1407 physical_pos (array of shape (n_windows, 2), optional): 1408 A 2D array containing the start and end physical positions for each window. 1409 chromosomes (array of shape (n_windows,), optional): 1410 An array with chromosome numbers corresponding to each genomic window. 1411 window_sizes (array of shape (n_windows,), optional): 1412 An array specifying the number of SNPs in each genomic window. 1413 laiobj (LocalAncestryObject, optional): 1414 A reference `LocalAncestryObject` from which to copy existing window definitions. 1415 1416 Returns: 1417 **LocalAncestryObject:** 1418 A LocalAncestryObject containing window-level ancestry data. 1419 """ 1420 from snputils.ancestry.genobj.local import LocalAncestryObject 1421 1422 if window_size is None and physical_pos is None and laiobj is None: 1423 raise ValueError("One of `window_size`, `physical_pos`, or `laiobj` must be provided.") 1424 1425 # 1. Fixed window size 1426 if window_size is not None: 1427 physical_pos = [] # Boundaries [start, end] of each window 1428 chromosomes = [] # Chromosome for each window 1429 window_sizes = [] # Number of SNPs for each window 1430 for chrom in self.unique_chrom: 1431 # Extract indices corresponding to this chromosome 1432 mask_chrom = (self.variants_chrom == chrom) 1433 # Subset to this chromosome 1434 pos_chrom = self.variants_pos[mask_chrom] 1435 # Number of SNPs for this chromosome 1436 n_snps_chrom = pos_chrom.size 1437 1438 # Initialize the start of the first window with the position of the first SNP 1439 current_start = self.variants_pos[0] 1440 1441 # Number of full windows with exactly `window_size` SNPs 1442 n_full_windows = n_snps_chrom // window_size 1443 1444 # Build all but the last window 1445 for i in range(n_full_windows-1): 1446 current_end = self.variants_pos[(i+1) * window_size - 1] 1447 physical_pos.append([current_start, current_end]) 1448 chromosomes.append(chrom) 1449 window_sizes.append(window_size) 1450 current_start = self.variants_pos[(i+1) * window_size] 1451 1452 # Build the last window 1453 current_end = self.variants_pos[-1] 1454 physical_pos.append([current_start, current_end]) 1455 chromosomes.append(chrom) 1456 window_sizes.append(n_snps_chrom - ((n_full_windows - 1) * window_size)) 1457 1458 physical_pos = np.array(physical_pos) 1459 chromosomes = np.array(chromosomes) 1460 window_sizes = np.array(window_sizes) 1461 1462 # 2. Custom start and end positions 1463 elif physical_pos is not None: 1464 # Check if there is exactly one chromosome 1465 if chromosomes is None: 1466 unique_chrom = self.unique_chrom 1467 if len(unique_chrom) == 1: 1468 # We assume all windows belong to this single chromosome 1469 single_chrom = unique_chrom[0] 1470 chromosomes = np.array([single_chrom] * physical_pos.shape[0]) 1471 else: 1472 raise ValueError("Multiple chromosomes detected, but `chromosomes` was not provided.") 1473 1474 # 3. Match existing windows to a reference laiobj 1475 elif laiobj is not None: 1476 physical_pos = laiobj.physical_pos 1477 chromosomes = laiobj.chromosomes 1478 window_sizes = laiobj.window_sizes 1479 1480 # Allocate an output LAI array 1481 n_windows = physical_pos.shape[0] 1482 n_samples = self.n_samples 1483 if len(self.calldata_lai.shape) == 3: 1484 lai = np.zeros((n_windows, n_samples, 2)) 1485 else: 1486 lai = np.zeros((n_windows, n_samples*2)) 1487 1488 # For each window, find the relevant SNPs and compute the mode of the ancestries 1489 for i, ((start, end), chrom) in enumerate(zip(physical_pos, chromosomes)): 1490 snps_mask = ( 1491 (self.variants_chrom == chrom) & 1492 (self.variants_pos >= start) & 1493 (self.variants_pos <= end) 1494 ) 1495 if np.any(snps_mask): 1496 lai_mask = self.calldata_lai[snps_mask, ...] 1497 mode_ancestries = mode(lai_mask, axis=0, nan_policy='omit').mode 1498 lai[i] = mode_ancestries 1499 else: 1500 lai[i] = np.nan 1501 1502 # Generate haplotype labels, e.g. "Sample1.0", "Sample1.1" 1503 haplotypes = [f"{sample}.{i}" for sample in self.samples for i in range(2)] 1504 1505 # If original data was (n_snps, n_samples, 2), flatten to (n_windows, n_samples*2) 1506 if len(self.calldata_lai.shape) == 3: 1507 lai = lai.reshape(n_windows, -1) 1508 1509 # Aggregate into a LocalAncestryObject 1510 return LocalAncestryObject( 1511 haplotypes=haplotypes, 1512 lai=lai, 1513 samples=self.samples, 1514 ancestry_map=self.ancestry_map, 1515 window_sizes=window_sizes, 1516 physical_pos=physical_pos, 1517 chromosomes=chromosomes 1518 )
Aggregate the calldata_lai
attribute into genomic windows within a
snputils.ancestry.genobj.LocalAncestryObject
.
Options for defining windows (in order of precedence):
- Fixed window size:
- Use
window_size
to specify how many SNPs go into each window. The last window on each chromosome may be larger if SNPs are not evenly divisible by the size.
- Custom start and end positions:
- Provide
physical_pos
(2D array of shape (n_windows, 2)) as the [start, end] base-pair coordinates for each window. - If
chromosomes
is not provided andself
has exactly one chromosome, all windows are assumed to belong to that chromosome. - If multiple chromosomes exist but
chromosomes
is missing, an error will be raised. - Optionally, provide
window_sizes
to store the SNP count per-window.
- Matching existing windows:
- Reuse window definitions (
physical_pos
,chromosomes
,window_sizes
) from an existinglaiobj
.
Arguments:
- window_size (int, optional): Number of SNPs in each window if defining fixed-size windows. If the total number of SNPs in a chromosome is not evenly divisible by the window size, the last window on that chromosome will include all remaining SNPs and therefore be larger than the specified size.
- physical_pos (array of shape (n_windows, 2), optional): A 2D array containing the start and end physical positions for each window.
- chromosomes (array of shape (n_windows,), optional): An array with chromosome numbers corresponding to each genomic window.
- window_sizes (array of shape (n_windows,), optional): An array specifying the number of SNPs in each genomic window.
- laiobj (LocalAncestryObject, optional): A reference
LocalAncestryObject
from which to copy existing window definitions.
Returns:
LocalAncestryObject: A LocalAncestryObject containing window-level ancestry data.
1520 def save(self, file: Union[str, Path]) -> None: 1521 """ 1522 Save the data stored in `self` to a specified file. 1523 1524 The format of the saved file is determined by the file extension provided in the `file` 1525 argument. 1526 1527 **Supported formats:** 1528 1529 - `.bed`: Binary PED (Plink) format. 1530 - `.pgen`: Plink2 binary genotype format. 1531 - `.vcf`: Variant Call Format. 1532 - `.pkl`: Pickle format for saving `self` in serialized form. 1533 1534 Args: 1535 file (str or pathlib.Path): 1536 Path to the file where the data will be saved. The extension of the file determines the save format. 1537 Supported extensions: `.bed`, `.pgen`, `.vcf`, `.pkl`. 1538 """ 1539 ext = Path(file).suffix.lower() 1540 if ext == '.bed': 1541 self.save_bed(file) 1542 elif ext == '.pgen': 1543 self.save_pgen(file) 1544 elif ext == '.vcf': 1545 self.save_vcf(file) 1546 elif ext == '.pkl': 1547 self.save_pickle(file) 1548 else: 1549 raise ValueError(f"Unsupported file extension: {ext}")
Save the data stored in self
to a specified file.
The format of the saved file is determined by the file extension provided in the file
argument.
Supported formats:
.bed
: Binary PED (Plink) format..pgen
: Plink2 binary genotype format..vcf
: Variant Call Format..pkl
: Pickle format for savingself
in serialized form.
Arguments:
- file (str or pathlib.Path): Path to the file where the data will be saved. The extension of the file determines the save format.
Supported extensions:
.bed
,.pgen
,.vcf
,.pkl
.
1551 def save_bed(self, file: Union[str, Path]) -> None: 1552 """ 1553 Save the data stored in `self` to a `.bed` file. 1554 1555 Args: 1556 file (str or pathlib.Path): 1557 Path to the file where the data will be saved. It should end with `.bed`. 1558 If the provided path does not have this extension, it will be appended. 1559 """ 1560 from snputils.snp.io.write.bed import BEDWriter 1561 writer = BEDWriter(snpobj=self, filename=file) 1562 writer.write()
Save the data stored in self
to a .bed
file.
Arguments:
- file (str or pathlib.Path): Path to the file where the data will be saved. It should end with
.bed
. If the provided path does not have this extension, it will be appended.
1564 def save_pgen(self, file: Union[str, Path]) -> None: 1565 """ 1566 Save the data stored in `self` to a `.pgen` file. 1567 1568 Args: 1569 file (str or pathlib.Path): 1570 Path to the file where the data will be saved. It should end with `.pgen`. 1571 If the provided path does not have this extension, it will be appended. 1572 """ 1573 from snputils.snp.io.write.pgen import PGENWriter 1574 writer = PGENWriter(snpobj=self, filename=file) 1575 writer.write()
Save the data stored in self
to a .pgen
file.
Arguments:
- file (str or pathlib.Path): Path to the file where the data will be saved. It should end with
.pgen
. If the provided path does not have this extension, it will be appended.
1577 def save_vcf(self, file: Union[str, Path]) -> None: 1578 """ 1579 Save the data stored in `self` to a `.vcf` file. 1580 1581 Args: 1582 file (str or pathlib.Path): 1583 Path to the file where the data will be saved. It should end with `.vcf`. 1584 If the provided path does not have this extension, it will be appended. 1585 """ 1586 from snputils.snp.io.write.vcf import VCFWriter 1587 writer = VCFWriter(snpobj=self, filename=file) 1588 writer.write()
Save the data stored in self
to a .vcf
file.
Arguments:
- file (str or pathlib.Path): Path to the file where the data will be saved. It should end with
.vcf
. If the provided path does not have this extension, it will be appended.
1590 def save_pickle(self, file: Union[str, Path]) -> None: 1591 """ 1592 Save `self` in serialized form to a `.pkl` file. 1593 1594 Args: 1595 file (str or pathlib.Path): 1596 Path to the file where the data will be saved. It should end with `.pkl`. 1597 If the provided path does not have this extension, it will be appended. 1598 """ 1599 import pickle 1600 with open(file, 'wb') as file: 1601 pickle.dump(self, file)
Save self
in serialized form to a .pkl
file.
Arguments:
- file (str or pathlib.Path): Path to the file where the data will be saved. It should end with
.pkl
. If the provided path does not have this extension, it will be appended.