snputils.snp.genobj
17class SNPObject: 18 """ 19 A class for Single Nucleotide Polymorphism (SNP) data, with optional support for 20 SNP-level Local Ancestry Information (LAI). 21 """ 22 def __init__( 23 self, 24 calldata_gt: Optional[np.ndarray] = None, 25 samples: Optional[np.ndarray] = None, 26 variants_ref: Optional[np.ndarray] = None, 27 variants_alt: Optional[np.ndarray] = None, 28 variants_chrom: Optional[np.ndarray] = None, 29 variants_filter_pass: Optional[np.ndarray] = None, 30 variants_id: Optional[np.ndarray] = None, 31 variants_pos: Optional[np.ndarray] = None, 32 variants_qual: Optional[np.ndarray] = None, 33 calldata_lai: Optional[np.ndarray] = None, 34 ancestry_map: Optional[Dict[str, str]] = None 35 ) -> None: 36 """ 37 Args: 38 calldata_gt (array, optional): 39 An array containing genotype data for each sample. This array can be either 2D with shape 40 `(n_snps, n_samples)` if the paternal and maternal strands are summed, or 3D with shape 41 `(n_snps, n_samples, 2)` if the strands are kept separate. 42 samples (array of shape (n_sampels,), optional): 43 An array containing unique sample identifiers. 44 variants_ref (array of shape (n_snps,), optional): 45 An array containing the reference allele for each SNP. 46 variants_alt (array of shape (n_snps,), optional): 47 An array containing the alternate allele for each SNP. 48 variants_chrom (array of shape (n_snps,), optional): 49 An array containing the chromosome for each SNP. 50 variants_filter_pass (array of shape (n_snps,), optional): 51 An array indicating whether each SNP passed control checks. 52 variants_id (array of shape (n_snps,), optional): 53 An array containing unique identifiers (IDs) for each SNP. 54 variants_pos (array of shape (n_snps,), optional): 55 An array containing the chromosomal positions for each SNP. 56 variants_qual (array of shape (n_snps,), optional): 57 An array containing the Phred-scaled quality score for each SNP. 58 calldata_lai (array, optional): 59 An array containing the ancestry for each SNP. This array can be either 2D with shape 60 `(n_snps, n_samples*2)`, or 3D with shape (n_snps, n_samples, 2). 61 ancestry_map (dict of str to str, optional): 62 A dictionary mapping ancestry codes to region names. 63 """ 64 self.__calldata_gt = calldata_gt 65 self.__samples = samples 66 self.__variants_ref = variants_ref 67 self.__variants_alt = variants_alt 68 self.__variants_chrom = variants_chrom 69 self.__variants_filter_pass = variants_filter_pass 70 self.__variants_id = variants_id 71 self.__variants_pos = variants_pos 72 self.__variants_qual = variants_qual 73 self.__calldata_lai = calldata_lai 74 self.__ancestry_map = ancestry_map 75 76 self._sanity_check() 77 78 def __getitem__(self, key: str) -> Any: 79 """ 80 To access an attribute of the class using the square bracket notation, 81 similar to a dictionary. 82 """ 83 try: 84 return getattr(self, key) 85 except: 86 raise KeyError(f'Invalid key: {key}.') 87 88 def __setitem__(self, key: str, value: Any): 89 """ 90 To set an attribute of the class using the square bracket notation, 91 similar to a dictionary. 92 """ 93 try: 94 setattr(self, key, value) 95 except: 96 raise KeyError(f'Invalid key: {key}.') 97 98 @property 99 def calldata_gt(self) -> np.ndarray: 100 """ 101 Retrieve `calldata_gt`. 102 103 Returns: 104 **array:** 105 An array containing genotype data for each sample. This array can be either 2D with shape 106 `(n_snps, n_samples)` if the paternal and maternal strands are summed, or 3D with shape 107 `(n_snps, n_samples, 2)` if the strands are kept separate. 108 """ 109 return self.__calldata_gt 110 111 @calldata_gt.setter 112 def calldata_gt(self, x: np.ndarray): 113 """ 114 Update `calldata_gt`. 115 """ 116 self.__calldata_gt = x 117 118 @property 119 def samples(self) -> Optional[np.ndarray]: 120 """ 121 Retrieve `samples`. 122 123 Returns: 124 **array of shape (n_sampels,):** 125 An array containing unique sample identifiers. 126 """ 127 return self.__samples 128 129 @samples.setter 130 def samples(self, x: Union[List, np.ndarray]): 131 """ 132 Update `samples`. 133 """ 134 self.__samples = np.asarray(x) 135 136 @property 137 def variants_ref(self) -> Optional[np.ndarray]: 138 """ 139 Retrieve `variants_ref`. 140 141 Returns: 142 **array of shape (n_snps,):** An array containing the reference allele for each SNP. 143 """ 144 return self.__variants_ref 145 146 @variants_ref.setter 147 def variants_ref(self, x: np.ndarray): 148 """ 149 Update `variants_ref`. 150 """ 151 self.__variants_ref = x 152 153 @property 154 def variants_alt(self) -> Optional[np.ndarray]: 155 """ 156 Retrieve `variants_alt`. 157 158 Returns: 159 **array of shape (n_snps,):** An array containing the alternate allele for each SNP. 160 """ 161 return self.__variants_alt 162 163 @variants_alt.setter 164 def variants_alt(self, x: np.ndarray): 165 """ 166 Update `variants_alt`. 167 """ 168 self.__variants_alt = x 169 170 @property 171 def variants_chrom(self) -> Optional[np.ndarray]: 172 """ 173 Retrieve `variants_chrom`. 174 175 Returns: 176 **array of shape (n_snps,):** An array containing the chromosome for each SNP. 177 """ 178 return self.__variants_chrom 179 180 @variants_chrom.setter 181 def variants_chrom(self, x: np.ndarray): 182 """ 183 Update `variants_chrom`. 184 """ 185 self.__variants_chrom = x 186 187 @property 188 def variants_filter_pass(self) -> Optional[np.ndarray]: 189 """ 190 Retrieve `variants_filter_pass`. 191 192 Returns: 193 **array of shape (n_snps,):** An array indicating whether each SNP passed control checks. 194 """ 195 return self.__variants_filter_pass 196 197 @variants_filter_pass.setter 198 def variants_filter_pass(self, x: np.ndarray): 199 """ 200 Update `variants_filter_pass`. 201 """ 202 self.__variants_filter_pass = x 203 204 @property 205 def variants_id(self) -> Optional[np.ndarray]: 206 """ 207 Retrieve `variants_id`. 208 209 Returns: 210 **array of shape (n_snps,):** An array containing unique identifiers (IDs) for each SNP. 211 """ 212 return self.__variants_id 213 214 @variants_id.setter 215 def variants_id(self, x: np.ndarray): 216 """ 217 Update `variants_id`. 218 """ 219 self.__variants_id = x 220 221 @property 222 def variants_pos(self) -> Optional[np.ndarray]: 223 """ 224 Retrieve `variants_pos`. 225 226 Returns: 227 **array of shape (n_snps,):** An array containing the chromosomal positions for each SNP. 228 """ 229 return self.__variants_pos 230 231 @variants_pos.setter 232 def variants_pos(self, x: np.ndarray): 233 """ 234 Update `variants_pos`. 235 """ 236 self.__variants_pos = x 237 238 @property 239 def variants_qual(self) -> Optional[np.ndarray]: 240 """ 241 Retrieve `variants_qual`. 242 243 Returns: 244 **array of shape (n_snps,):** An array containing the Phred-scaled quality score for each SNP. 245 """ 246 return self.__variants_qual 247 248 @variants_qual.setter 249 def variants_qual(self, x: np.ndarray): 250 """ 251 Update `variants_qual`. 252 """ 253 self.__variants_qual = x 254 255 @property 256 def calldata_lai(self) -> Optional[np.ndarray]: 257 """ 258 Retrieve `calldata_lai`. 259 260 Returns: 261 **array:** 262 An array containing the ancestry for each SNP. This array can be either 2D with shape 263 `(n_snps, n_samples*2)`, or 3D with shape (n_snps, n_samples, 2). 264 """ 265 return self.__calldata_lai 266 267 @calldata_lai.setter 268 def calldata_lai(self, x: np.ndarray): 269 """ 270 Update `calldata_lai`. 271 """ 272 self.__calldata_lai = x 273 274 @property 275 def ancestry_map(self) -> Optional[Dict[str, str]]: 276 """ 277 Retrieve `ancestry_map`. 278 279 Returns: 280 **dict of str to str:** A dictionary mapping ancestry codes to region names. 281 """ 282 return self.__ancestry_map 283 284 @ancestry_map.setter 285 def ancestry_map(self, x): 286 """ 287 Update `ancestry_map`. 288 """ 289 self.__ancestry_map = x 290 291 @property 292 def n_samples(self) -> int: 293 """ 294 Retrieve `n_samples`. 295 296 Returns: 297 **int:** The total number of samples. 298 """ 299 if self.__samples is not None: 300 return len(self.__samples) 301 elif self.__calldata_gt is not None: 302 return self.__calldata_gt.shape[1] 303 elif self.__calldata_lai is not None: 304 if self.__calldata_lai.ndim == 2: 305 return self.__calldata_lai.shape[1] // 2 306 elif self.__calldata_lai.ndim == 3: 307 return self.__calldata_lai.shape[1] 308 else: 309 raise ValueError("Unable to determine the total number of samples: no relevant data is available.") 310 311 @property 312 def n_snps(self) -> int: 313 """ 314 Retrieve `n_snps`. 315 316 Returns: 317 **int:** The total number of SNPs. 318 """ 319 # List of attributes that can indicate the number of SNPs 320 potential_attributes = [ 321 self.__calldata_gt, 322 self.__variants_ref, 323 self.__variants_alt, 324 self.__variants_chrom, 325 self.__variants_filter_pass, 326 self.__variants_id, 327 self.__variants_pos, 328 self.__variants_qual, 329 self.__calldata_lai 330 ] 331 332 # Check each attribute for its first dimension, which corresponds to `n_snps` 333 for attr in potential_attributes: 334 if attr is not None: 335 return attr.shape[0] 336 337 raise ValueError("Unable to determine the total number of SNPs: no relevant data is available.") 338 339 @property 340 def n_chrom(self) -> Optional[int]: 341 """ 342 Retrieve `n_chrom`. 343 344 Returns: 345 **int:** The total number of unique chromosomes in `variants_chrom`. 346 """ 347 if self.variants_chrom is None: 348 warnings.warn("Chromosome data `variants_chrom` is None.") 349 return None 350 351 return len(self.unique_chrom) 352 353 @property 354 def n_ancestries(self) -> int: 355 """ 356 Retrieve `n_ancestries`. 357 358 Returns: 359 **int:** The total number of unique ancestries. 360 """ 361 if self.__calldata_lai is not None: 362 return len(np.unique(self.__calldata_lai)) 363 else: 364 raise ValueError("Unable to determine the total number of ancestries: no relevant data is available.") 365 366 @property 367 def unique_chrom(self) -> Optional[np.ndarray]: 368 """ 369 Retrieve `unique_chrom`. 370 371 Returns: 372 **array:** The unique chromosome names in `variants_chrom`, preserving their order of appearance. 373 """ 374 if self.variants_chrom is None: 375 warnings.warn("Chromosome data `variants_chrom` is None.") 376 return None 377 378 # Identify unique chromosome names and their first indexes of occurrence 379 _, idx = np.unique(self.variants_chrom, return_index=True) 380 # Return chromosome names sorted by their first occurrence to maintain original order 381 return self.variants_chrom[np.sort(idx)] 382 383 @property 384 def are_strands_summed(self) -> bool: 385 """ 386 Retrieve `are_strands_summed`. 387 388 Returns: 389 **bool:** 390 True if the maternal and paternal strands have been summed together, which is indicated by 391 `calldata_gt` having shape `(n_samples, n_snps)`. False if the strands are stored separately, 392 indicated by `calldata_gt` having shape `(n_samples, n_snps, 2)`. 393 """ 394 if self.calldata_gt is None: 395 warnings.warn("Genotype data `calldata_gt` is None.") 396 return None 397 398 return self.calldata_gt.ndim == 2 399 400 def copy(self) -> 'SNPObject': 401 """ 402 Create and return a copy of `self`. 403 404 Returns: 405 **SNPObject:** 406 A new instance of the current object. 407 """ 408 return copy.deepcopy(self) 409 410 def keys(self) -> List[str]: 411 """ 412 Retrieve a list of public attribute names for `self`. 413 414 Returns: 415 **list of str:** 416 A list of attribute names, with internal name-mangling removed, 417 for easier reference to public attributes in the instance. 418 """ 419 return [attr.replace('_SNPObject__', '') for attr in vars(self)] 420 421 def sum_strands(self, inplace: bool = False) -> Optional['SNPObject']: 422 """ 423 Sum paternal and maternal strands. 424 425 Args: 426 inplace (bool, default=False): 427 If True, modifies `self` in place. If False, returns a new `SNPObject` with the variants 428 filtered. Default is False. 429 430 Returns: 431 **Optional[SNPObject]:** 432 A new `SNPObject` with summed strands if `inplace=False`. 433 If `inplace=True`, modifies `self` in place and returns None. 434 """ 435 if self.calldata_gt is None: 436 warnings.warn("Genotype data `calldata_gt` is None.") 437 return None if not inplace else self 438 439 if self.are_strands_summed: 440 warnings.warn("Genotype data `calldata_gt` is already summed.") 441 return self if inplace else self.copy() 442 443 if inplace: 444 self.calldata_gt = self.calldata_gt.sum(axis=2, dtype=np.int8) 445 return self 446 else: 447 snpobj = self.copy() 448 snpobj.calldata_gt = self.calldata_gt.sum(axis=2, dtype=np.int8) 449 return snpobj 450 451 def filter_variants( 452 self, 453 chrom: Optional[Union[str, Sequence[str], np.ndarray, None]] = None, 454 pos: Optional[Union[int, Sequence[int], np.ndarray, None]] = None, 455 indexes: Optional[Union[int, Sequence[int], np.ndarray, None]] = None, 456 include: bool = True, 457 inplace: bool = False 458 ) -> Optional['SNPObject']: 459 """ 460 Filter variants based on specified chromosome names, variant positions, or variant indexes. 461 462 This method updates the `calldata_gt`, `variants_ref`, `variants_alt`, 463 `variants_chrom`, `variants_filter_pass`, `variants_id`, `variants_pos`, 464 `variants_qual`, and `lai` attributes to include or exclude the specified variants. The filtering 465 criteria can be based on chromosome names, variant positions, or indexes. If multiple 466 criteria are provided, their union is used for filtering. The order of the variants is preserved. 467 468 Negative indexes are supported and follow 469 [NumPy's indexing conventions](https://numpy.org/doc/stable/user/basics.indexing.html). 470 471 Args: 472 chrom (str or array_like of str, optional): 473 Chromosome(s) to filter variants by. Can be a single chromosome as a string or a sequence 474 of chromosomes. If both `chrom` and `pos` are provided, they must either have matching lengths 475 (pairing each chromosome with a position) or `chrom` should be a single value that applies to 476 all positions in `pos`. Default is None. 477 pos (int or array_like of int, optional): 478 Position(s) to filter variants by. Can be a single position as an integer or a sequence of positions. 479 If `chrom` is also provided, `pos` should either match `chrom` in length or `chrom` should be a 480 single value. Default is None. 481 indexes (int or array_like of int, optional): 482 Index(es) of the variants to include or exclude. Can be a single index or a sequence 483 of indexes. Negative indexes are supported. Default is None. 484 include (bool, default=True): 485 If True, includes only the specified variants. If False, excludes the specified 486 variants. Default is True. 487 inplace (bool, default=False): 488 If True, modifies `self` in place. If False, returns a new `SNPObject` with the variants 489 filtered. Default is False. 490 491 Returns: 492 **Optional[SNPObject]:** 493 A new `SNPObject` with the specified variants filtered if `inplace=False`. 494 If `inplace=True`, modifies `self` in place and returns None. 495 """ 496 if chrom is None and pos is None and indexes is None: 497 raise ValueError("At least one of 'chrom', 'pos', or 'indexes' must be provided.") 498 499 n_snps = self.n_snps 500 501 # Convert inputs to arrays for consistency 502 chrom = np.atleast_1d(chrom) if chrom is not None else None 503 pos = np.atleast_1d(pos) if pos is not None else None 504 indexes = np.atleast_1d(indexes) if indexes is not None else None 505 506 # Validate chrom and pos lengths if both are provided 507 if chrom is not None and pos is not None: 508 if len(chrom) != len(pos) and len(chrom) > 1: 509 raise ValueError( 510 "When both 'chrom' and 'pos' are provided, they must either be of the same length " 511 "or 'chrom' must be a single value." 512 ) 513 514 # Create a mask for chromosome and position filtering 515 mask_combined = np.zeros(n_snps, dtype=bool) 516 if chrom is not None and pos is not None: 517 if len(chrom) == 1: 518 # Apply single chromosome to all positions in `pos` 519 mask_combined = (self['variants_chrom'] == chrom[0]) & np.isin(self['variants_pos'], pos) 520 else: 521 # Vectorized pair matching for chrom and pos 522 query_pairs = np.array( 523 list(zip(chrom, pos)), 524 dtype=[ 525 ('chrom', self['variants_chrom'].dtype), 526 ('pos', self['variants_pos'].dtype) 527 ] 528 ) 529 data_pairs = np.array( 530 list(zip(self['variants_chrom'], self['variants_pos'])), 531 dtype=[ 532 ('chrom', self['variants_chrom'].dtype), 533 ('pos', self['variants_pos'].dtype) 534 ] 535 ) 536 mask_combined = np.isin(data_pairs, query_pairs) 537 538 elif chrom is not None: 539 # Only chromosome filtering 540 mask_combined = np.isin(self['variants_chrom'], chrom) 541 elif pos is not None: 542 # Only position filtering 543 mask_combined = np.isin(self['variants_pos'], pos) 544 545 # Create mask based on indexes if provided 546 if indexes is not None: 547 # Validate indexes, allowing negative indexes 548 out_of_bounds_indexes = indexes[(indexes < -n_snps) | (indexes >= n_snps)] 549 if out_of_bounds_indexes.size > 0: 550 raise ValueError(f"One or more sample indexes are out of bounds.") 551 552 # Handle negative indexes and check for out-of-bounds indexes 553 adjusted_indexes = np.mod(indexes, n_snps) 554 555 # Create mask for specified indexes 556 mask_indexes = np.zeros(n_snps, dtype=bool) 557 mask_indexes[adjusted_indexes] = True 558 559 # Combine with `chrom` and `pos` mask using logical OR (union of all specified criteria) 560 mask_combined = mask_combined | mask_indexes 561 562 # Invert mask if `include` is False 563 if not include: 564 mask_combined = ~mask_combined 565 566 # Define keys to filter 567 keys = [ 568 'calldata_gt', 'variants_ref', 'variants_alt', 'variants_chrom', 'variants_filter_pass', 569 'variants_id', 'variants_pos', 'variants_qual', 'calldata_lai' 570 ] 571 572 # Apply filtering based on inplace parameter 573 if inplace: 574 for key in keys: 575 if self[key] is not None: 576 if self[key].ndim > 1: 577 self[key] = np.asarray(self[key])[mask_combined, ...] 578 else: 579 self[key] = np.asarray(self[key])[mask_combined] 580 581 return None 582 else: 583 # Create A new `SNPObject` with filtered data 584 snpobj = self.copy() 585 for key in keys: 586 if snpobj[key] is not None: 587 if snpobj[key].ndim > 1: 588 snpobj[key] = np.asarray(snpobj[key])[mask_combined, ...] 589 else: 590 snpobj[key] = np.asarray(snpobj[key])[mask_combined] 591 592 return snpobj 593 594 def filter_samples( 595 self, 596 samples: Optional[Union[str, Sequence[str], np.ndarray, None]] = None, 597 indexes: Optional[Union[int, Sequence[int], np.ndarray, None]] = None, 598 include: bool = True, 599 inplace: bool = False 600 ) -> Optional['SNPObject']: 601 """ 602 Filter samples based on specified names or indexes. 603 604 This method updates the `samples` and `calldata_gt` attributes to include or exclude the specified 605 samples. The order of the samples is preserved. 606 607 If both samples and indexes are provided, any sample matching either a name in samples or an index in 608 indexes will be included or excluded. 609 610 This method allows inclusion or exclusion of specific samples by their names or 611 indexes. When both sample names and indexes are provided, the union of the specified samples 612 is used. Negative indexes are supported and follow 613 [NumPy's indexing conventions](https://numpy.org/doc/stable/user/basics.indexing.html). 614 615 Args: 616 samples (str or array_like of str, optional): 617 Name(s) of the samples to include or exclude. Can be a single sample name or a 618 sequence of sample names. Default is None. 619 indexes (int or array_like of int, optional): 620 Index(es) of the samples to include or exclude. Can be a single index or a sequence 621 of indexes. Negative indexes are supported. Default is None. 622 include (bool, default=True): 623 If True, includes only the specified samples. If False, excludes the specified 624 samples. Default is True. 625 inplace (bool, default=False): 626 If True, modifies `self` in place. If False, returns a new `SNPObject` with the samples 627 filtered. Default is False. 628 629 Returns: 630 **Optional[SNPObject]:** 631 A new `SNPObject` with the specified samples filtered if `inplace=False`. 632 If `inplace=True`, modifies `self` in place and returns None. 633 """ 634 if samples is None and indexes is None: 635 raise ValueError("At least one of 'samples' or 'indexes' must be provided.") 636 637 n_samples = self.n_samples 638 sample_names = np.array(self['samples']) 639 640 # Create mask based on sample names 641 if samples is not None: 642 samples = np.atleast_1d(samples) 643 mask_samples = np.isin(sample_names, samples) 644 missing_samples = samples[~np.isin(samples, sample_names)] 645 if missing_samples.size > 0: 646 raise ValueError(f"The following specified samples were not found: {missing_samples.tolist()}") 647 else: 648 mask_samples = np.zeros(n_samples, dtype=bool) 649 650 # Create mask based on sample indexes 651 if indexes is not None: 652 indexes = np.atleast_1d(indexes) 653 654 # Validate indexes, allowing negative indexes 655 out_of_bounds_indexes = indexes[(indexes < -n_samples) | (indexes >= n_samples)] 656 if out_of_bounds_indexes.size > 0: 657 raise ValueError(f"One or more sample indexes are out of bounds.") 658 659 # Handle negative indexes 660 adjusted_indexes = np.mod(indexes, n_samples) 661 662 mask_indexes = np.zeros(n_samples, dtype=bool) 663 mask_indexes[adjusted_indexes] = True 664 else: 665 mask_indexes = np.zeros(n_samples, dtype=bool) 666 667 # Combine masks using logical OR (union of samples) 668 mask_combined = mask_samples | mask_indexes 669 670 if not include: 671 mask_combined = ~mask_combined 672 673 # Define keys to filter 674 keys = ['samples', 'calldata_gt', 'calldata_lai'] 675 676 # Apply filtering based on inplace parameter 677 if inplace: 678 for key in keys: 679 if self[key] is not None: 680 if self[key].ndim > 1: 681 self[key] = np.asarray(self[key])[:, mask_combined, ...] 682 else: 683 self[key] = np.asarray(self[key])[mask_combined] 684 685 return None 686 else: 687 # Create A new `SNPObject` with filtered data 688 snpobj = self.copy() 689 for key in keys: 690 if snpobj[key] is not None: 691 if snpobj[key].ndim > 1: 692 snpobj[key] = np.asarray(snpobj[key])[:, mask_combined, ...] 693 else: 694 snpobj[key] = np.asarray(snpobj[key])[mask_combined] 695 return snpobj 696 697 def detect_chromosome_format(self) -> str: 698 """ 699 Detect the chromosome naming convention in `variants_chrom` based on the prefix 700 of the first chromosome identifier in `unique_chrom`. 701 702 **Recognized formats:** 703 704 - `'chr'`: Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'. 705 - `'chm'`: Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'. 706 - `'chrom'`: Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'. 707 - `'plain'`: Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'. 708 709 If the format does not match any recognized pattern, `'Unknown format'` is returned. 710 711 Returns: 712 **str:** 713 A string indicating the detected chromosome format (`'chr'`, `'chm'`, `'chrom'`, or `'plain'`). 714 If no recognized format is matched, returns `'Unknown format'`. 715 """ 716 # Select the first unique chromosome identifier for format detection 717 chromosome_str = self.unique_chrom[0] 718 719 # Define regular expressions to match each recognized chromosome format 720 patterns = { 721 'chr': r'^chr(\d+|X|Y|M)$', # Matches 'chr' prefixed format 722 'chm': r'^chm(\d+|X|Y|M)$', # Matches 'chm' prefixed format 723 'chrom': r'^chrom(\d+|X|Y|M)$', # Matches 'chrom' prefixed format 724 'plain': r'^(\d+|X|Y|M)$' # Matches plain format without prefix 725 } 726 727 # Iterate through the patterns to identify the chromosome format 728 for prefix, pattern in patterns.items(): 729 if re.match(pattern, chromosome_str): 730 return prefix # Return the recognized format prefix 731 732 # If no pattern matches, return 'Unknown format' 733 return 'Unknown format' 734 735 def convert_chromosome_format( 736 self, 737 from_format: str, 738 to_format: str, 739 inplace: bool = False 740 ) -> Optional['SNPObject']: 741 """ 742 Convert the chromosome format from one naming convention to another in `variants_chrom`. 743 744 **Supported formats:** 745 746 - `'chr'`: Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'. 747 - `'chm'`: Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'. 748 - `'chrom'`: Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'. 749 - `'plain'`: Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'. 750 751 Args: 752 from_format (str): 753 The current chromosome format. Acceptable values are `'chr'`, `'chm'`, `'chrom'`, or `'plain'`. 754 to_format (str): 755 The target format for chromosome data conversion. Acceptable values match `from_format` options. 756 inplace (bool, default=False): 757 If True, modifies `self` in place. If False, returns a new `SNPObject` with the converted format. 758 Default is False. 759 760 Returns: 761 **Optional[SNPObject]:** A new `SNPObject` with the converted chromosome format if `inplace=False`. 762 If `inplace=True`, modifies `self` in place and returns None. 763 """ 764 # Define the list of standard chromosome identifiers 765 chrom_list = [*map(str, range(1, 23)), 'X', 'Y', 'M'] # M for mitochondrial chromosomes 766 767 # Format mappings for different chromosome naming conventions 768 format_mappings = { 769 'chr': [f'chr{i}' for i in chrom_list], 770 'chm': [f'chm{i}' for i in chrom_list], 771 'chrom': [f'chrom{i}' for i in chrom_list], 772 'plain': chrom_list, 773 } 774 775 # Verify that from_format and to_format are valid naming conventions 776 if from_format not in format_mappings or to_format not in format_mappings: 777 raise ValueError(f"Invalid format: {from_format} or {to_format}. Must be one of {list(format_mappings.keys())}.") 778 779 # Convert chromosomes to string for consistent comparison 780 variants_chrom = self['variants_chrom'].astype(str) 781 782 # Verify that all chromosomes in the object follow the specified `from_format` 783 expected_chroms = set(format_mappings[from_format]) 784 mismatched_chroms = set(variants_chrom) - expected_chroms 785 786 if mismatched_chroms: 787 raise ValueError(f"The following chromosomes do not match the `from_format` '{from_format}': {mismatched_chroms}.") 788 789 # Create conditions for selecting based on current `from_format` names 790 conditions = [variants_chrom == chrom for chrom in format_mappings[from_format]] 791 792 # Rename chromosomes based on inplace flag 793 if inplace: 794 self['variants_chrom'] = np.select(conditions, format_mappings[to_format], default='unknown') 795 return None 796 else: 797 snpobject = self.copy() 798 snpobject['variants_chrom'] = np.select(conditions, format_mappings[to_format], default='unknown') 799 return snpobject 800 801 def match_chromosome_format(self, snpobj: 'SNPObject', inplace: bool = False) -> Optional['SNPObject']: 802 """ 803 Convert the chromosome format in `variants_chrom` from `self` to match the format of a reference `snpobj`. 804 805 **Recognized formats:** 806 807 - `'chr'`: Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'. 808 - `'chm'`: Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'. 809 - `'chrom'`: Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'. 810 - `'plain'`: Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'. 811 812 Args: 813 snpobj (SNPObject): 814 The reference SNPObject whose chromosome format will be matched. 815 inplace (bool, default=False): 816 If True, modifies `self` in place. If False, returns a new `SNPObject` with the 817 chromosome format matching that of `snpobj`. Default is False. 818 819 Returns: 820 **Optional[SNPObject]:** 821 A new `SNPObject` with matched chromosome format if `inplace=False`. 822 If `inplace=True`, modifies `self` in place and returns None. 823 """ 824 # Detect the chromosome naming format of the current SNPObject 825 fmt1 = self.detect_chromosome_format() 826 if fmt1 == 'Unknown format': 827 raise ValueError("The chromosome format of the current SNPObject is unrecognized.") 828 829 # Detect the chromosome naming format of the reference SNPObject 830 fmt2 = snpobj.detect_chromosome_format() 831 if fmt2 == 'Unknown format': 832 raise ValueError("The chromosome format of the reference SNPObject is unrecognized.") 833 834 # Convert the current SNPObject's chromosome format to match the reference format 835 return self.convert_chromosome_format(fmt1, fmt2, inplace=inplace) 836 837 def rename_chrom( 838 self, 839 to_replace: Union[Dict[str, str], str, List[str]] = {'^([0-9]+)$': r'chr\1', r'^chr([0-9]+)$': r'\1'}, 840 value: Optional[Union[str, List[str]]] = None, 841 regex: bool = True, 842 inplace: bool = False 843 ) -> Optional['SNPObject']: 844 """ 845 Replace chromosome values in `variants_chrom` using patterns or exact matches. 846 847 This method allows flexible chromosome replacements, using regex or exact matches, useful 848 for non-standard chromosome formats. For standard conversions (e.g., 'chr1' to '1'), 849 consider `convert_chromosome_format`. 850 851 Args: 852 to_replace (dict, str, or list of str): 853 Pattern(s) or exact value(s) to be replaced in chromosome names. Default behavior 854 transforms `<chrom_num>` to `chr<chrom_num>` or vice versa. Non-matching values 855 remain unchanged. 856 - If str or list of str: Matches will be replaced with `value`. 857 - If regex (bool), then any regex matches will be replaced with `value`. 858 - If dict: Keys defines values to replace, with corresponding replacements as values. 859 value (str or list of str, optional): 860 Replacement value(s) if `to_replace` is a string or list. Ignored if `to_replace` 861 is a dictionary. 862 regex (bool, default=True): 863 If True, interprets `to_replace` keys as regex patterns. 864 inplace (bool, default=False): 865 If True, modifies `self` in place. If False, returns a new `SNPObject` with the chromosomes 866 renamed. Default is False. 867 868 Returns: 869 **Optional[SNPObject]:** A new `SNPObject` with the renamed chromosome format if `inplace=False`. 870 If `inplace=True`, modifies `self` in place and returns None. 871 """ 872 # Standardize input format: convert `to_replace` and `value` to a dictionary if needed 873 if isinstance(to_replace, (str, int)): 874 to_replace = [to_replace] 875 if isinstance(value, (str, int)): 876 value = [value] 877 if isinstance(to_replace, list) and isinstance(value, list): 878 dictionary = dict(zip(to_replace, value)) 879 elif isinstance(to_replace, dict) and value is None: 880 dictionary = to_replace 881 else: 882 raise ValueError( 883 "Invalid input: `to_replace` and `value` must be compatible types (both str, list of str, or dict)." 884 ) 885 886 # Vectorized function for replacing values in chromosome array 887 vec_replace_values = np.vectorize(self._match_to_replace) 888 889 # Rename chromosomes based on inplace flag 890 if inplace: 891 self.variants_chrom = vec_replace_values(self.variants_chrom, dictionary, regex) 892 return None 893 else: 894 snpobj = self.copy() 895 snpobj.variants_chrom = vec_replace_values(self.variants_chrom, dictionary, regex) 896 return snpobj 897 898 def rename_missings( 899 self, 900 before: Union[int, float, str] = -1, 901 after: Union[int, float, str] = '.', 902 inplace: bool = False 903 ) -> Optional['SNPObject']: 904 """ 905 Replace missing values in the `calldata_gt` attribute. 906 907 This method identifies missing values in 'calldata_gt' and replaces them with a specified 908 value. By default, it replaces occurrences of `-1` (often used to signify missing data) with `'.'`. 909 910 Args: 911 before (int, float, or str, default=-1): 912 The current representation of missing values in `calldata_gt`. Common values might be -1, '.', or NaN. 913 Default is -1. 914 after (int, float, or str, default='.'): 915 The value that will replace `before`. Default is '.'. 916 inplace (bool, default=False): 917 If True, modifies `self` in place. If False, returns a new `SNPObject` with the applied 918 replacements. Default is False. 919 920 Returns: 921 **Optional[SNPObject]:** 922 A new `SNPObject` with the renamed missing values if `inplace=False`. 923 If `inplace=True`, modifies `self` in place and returns None. 924 """ 925 # Rename missing values in the `calldata_gt` attribute based on inplace flag 926 if inplace: 927 self['calldata_gt'] = np.where(self['calldata_gt'] == before, after, self['calldata_gt']) 928 return None 929 else: 930 snpobj = self.copy() 931 snpobj['calldata_gt'] = np.where(snpobj['calldata_gt'] == before, after, snpobj['calldata_gt']) 932 return snpobj 933 934 def get_common_variants_intersection( 935 self, 936 snpobj: 'SNPObject', 937 index_by: str = 'pos' 938 ) -> Tuple[List[str], np.ndarray, np.ndarray]: 939 """ 940 Identify common variants between `self` and the `snpobj` instance based on the specified `index_by` criterion, 941 which may match based on chromosome and position (`variants_chrom`, `variants_pos`), ID (`variants_id`), or both. 942 943 This method returns the identifiers of common variants and their corresponding indices in both objects. 944 945 Args: 946 snpobj (SNPObject): 947 The reference SNPObject to compare against. 948 index_by (str, default='pos'): 949 Criteria for matching variants. Options: 950 - `'pos'`: Matches by chromosome and position (`variants_chrom`, `variants_pos`), e.g., 'chr1-12345'. 951 - `'id'`: Matches by variant ID alone (`variants_id`), e.g., 'rs123'. 952 - `'pos+id'`: Matches by chromosome, position, and ID (`variants_chrom`, `variants_pos`, `variants_id`), e.g., 'chr1-12345-rs123'. 953 Default is 'pos'. 954 955 Returns: 956 Tuple containing: 957 - **list of str:** A list of common variant identifiers (as strings). 958 - **array:** An array of indices in `self` where common variants are located. 959 - **array:** An array of indices in `snpobj` where common variants are located. 960 """ 961 # Create unique identifiers for each variant in both SNPObjects based on the specified criterion 962 if index_by == 'pos': 963 query_identifiers = [f"{chrom}-{pos}" for chrom, pos in zip(self['variants_chrom'], self['variants_pos'])] 964 reference_identifiers = [f"{chrom}-{pos}" for chrom, pos in zip(snpobj['variants_chrom'], snpobj['variants_pos'])] 965 elif index_by == 'id': 966 query_identifiers = self['variants_id'].tolist() 967 reference_identifiers = snpobj['variants_id'].tolist() 968 elif index_by == 'pos+id': 969 query_identifiers = [ 970 f"{chrom}-{pos}-{ids}" for chrom, pos, ids in zip(self['variants_chrom'], self['variants_pos'], self['variants_id']) 971 ] 972 reference_identifiers = [ 973 f"{chrom}-{pos}-{ids}" for chrom, pos, ids in zip(snpobj['variants_chrom'], snpobj['variants_pos'], snpobj['variants_id']) 974 ] 975 else: 976 raise ValueError("`index_by` must be one of 'pos', 'id', or 'pos+id'.") 977 978 # Convert to sets for intersection 979 common_ids = set(query_identifiers).intersection(reference_identifiers) 980 981 # Collect indices for common identifiers 982 query_idx = [i for i, id in enumerate(query_identifiers) if id in common_ids] 983 reference_idx = [i for i, id in enumerate(reference_identifiers) if id in common_ids] 984 985 return list(common_ids), np.array(query_idx), np.array(reference_idx) 986 987 def get_common_markers_intersection( 988 self, 989 snpobj: 'SNPObject' 990 ) -> Tuple[List[str], np.ndarray, np.ndarray]: 991 """ 992 Identify common markers between between `self` and the `snpobj` instance. Common markers are identified 993 based on matching chromosome (`variants_chrom`), position (`variants_pos`), reference (`variants_ref`), 994 and alternate (`variants_alt`) alleles. 995 996 This method returns the identifiers of common markers and their corresponding indices in both objects. 997 998 Args: 999 snpobj (SNPObject): 1000 The reference SNPObject to compare against. 1001 1002 Returns: 1003 Tuple containing: 1004 - **list of str:** A list of common variant identifiers (as strings). 1005 - **array:** An array of indices in `self` where common variants are located. 1006 - **array:** An array of indices in `snpobj` where common variants are located. 1007 """ 1008 # Generate unique identifiers based on chrom, pos, ref, and alt alleles 1009 query_identifiers = [ 1010 f"{chrom}-{pos}-{ref}-{alt}" for chrom, pos, ref, alt in 1011 zip(self['variants_chrom'], self['variants_pos'], self['variants_ref'], self['variants_alt']) 1012 ] 1013 reference_identifiers = [ 1014 f"{chrom}-{pos}-{ref}-{alt}" for chrom, pos, ref, alt in 1015 zip(snpobj['variants_chrom'], snpobj['variants_pos'], snpobj['variants_ref'], snpobj['variants_alt']) 1016 ] 1017 1018 # Convert to sets for intersection 1019 common_ids = set(query_identifiers).intersection(reference_identifiers) 1020 1021 # Collect indices for common identifiers in both SNPObjects 1022 query_idx = [i for i, id in enumerate(query_identifiers) if id in common_ids] 1023 reference_idx = [i for i, id in enumerate(reference_identifiers) if id in common_ids] 1024 1025 return list(common_ids), np.array(query_idx), np.array(reference_idx) 1026 1027 def subset_to_common_variants( 1028 self, 1029 snpobj: 'SNPObject', 1030 index_by: str = 'pos', 1031 common_variants_intersection: Optional[Tuple[np.ndarray, np.ndarray]] = None, 1032 inplace: bool = False 1033 ) -> Optional['SNPObject']: 1034 """ 1035 Subset `self` to include only the common variants with a reference `snpobj` based on 1036 the specified `index_by` criterion, which may match based on chromosome and position 1037 (`variants_chrom`, `variants_pos`), ID (`variants_id`), or both. 1038 1039 Args: 1040 snpobj (SNPObject): 1041 The reference SNPObject to compare against. 1042 index_by (str, default='pos'): 1043 Criteria for matching variants. Options: 1044 - `'pos'`: Matches by chromosome and position (`variants_chrom`, `variants_pos`), e.g., 'chr1-12345'. 1045 - `'id'`: Matches by variant ID alone (`variants_id`), e.g., 'rs123'. 1046 - `'pos+id'`: Matches by chromosome, position, and ID (`variants_chrom`, `variants_pos`, `variants_id`), e.g., 'chr1-12345-rs123'. 1047 Default is 'pos'. 1048 common_variants_intersection (Tuple[np.ndarray, np.ndarray], optional): 1049 Precomputed indices of common variants between `self` and `snpobj`. If None, intersection is 1050 computed within the function. 1051 inplace (bool, default=False): 1052 If True, modifies `self` in place. If False, returns a new `SNPObject` with the common variants 1053 subsetted. Default is False. 1054 1055 Returns: 1056 **Optional[SNPObject]:** 1057 A new `SNPObject` with the common variants subsetted if `inplace=False`. 1058 If `inplace=True`, modifies `self` in place and returns None. 1059 """ 1060 # Get indices of common variants if not provided 1061 if common_variants_intersection is None: 1062 _, query_idx, _ = self.get_common_variants_intersection(snpobj, index_by=index_by) 1063 else: 1064 query_idx, _ = common_variants_intersection 1065 1066 # Use filter_variants method with the identified indices, applying `inplace` as specified 1067 return self.filter_variants(indexes=query_idx, include=True, inplace=inplace) 1068 1069 def subset_to_common_markers( 1070 self, 1071 snpobj: 'SNPObject', 1072 common_markers_intersection: Optional[Tuple[np.ndarray, np.ndarray]] = None, 1073 inplace: bool = False 1074 ) -> Optional['SNPObject']: 1075 """ 1076 Subset `self` to include only the common markers with a reference `snpobj`. Common markers are identified 1077 based on matching chromosome (`variants_chrom`), position (`variants_pos`), reference (`variants_ref`), 1078 and alternate (`variants_alt`) alleles. 1079 1080 Args: 1081 snpobj (SNPObject): 1082 The reference SNPObject to compare against. 1083 common_markers_intersection (tuple of arrays, optional): 1084 Precomputed indices of common markers between `self` and `snpobj`. If None, intersection is 1085 computed within the function. 1086 inplace (bool, default=False): 1087 If True, modifies `self` in place. If False, returns a new `SNPObject` with the common markers 1088 subsetted. Default is False. 1089 1090 Returns: 1091 **Optional[SNPObject]:** 1092 A new `SNPObject` with the common markers subsetted if `inplace=False`. 1093 If `inplace=True`, modifies `self` in place and returns None. 1094 """ 1095 # Get indices of common markers if not provided 1096 if common_markers_intersection is None: 1097 _, query_idx, _ = self.get_common_markers_intersection(snpobj) 1098 else: 1099 query_idx, _ = common_markers_intersection 1100 1101 # Use filter_variants method with the identified indices, applying `inplace` as specified 1102 return self.filter_variants(indexes=query_idx, include=True, inplace=inplace) 1103 1104 def merge( 1105 self, 1106 snpobj: 'SNPObject', 1107 force_samples: bool = False, 1108 prefix: str = '2', 1109 inplace: bool = False 1110 ) -> Optional['SNPObject']: 1111 """ 1112 Merge `self` with `snpobj` along the sample axis. 1113 1114 This method expects both SNPObjects to contain the same set of SNPs in the same order, 1115 then combines their genotype (`calldata_gt`) and LAI (`calldata_lai`) arrays by 1116 concatenating the sample dimension. Samples from `snpobj` are appended to those in `self`. 1117 1118 Args: 1119 snpobj (SNPObject): 1120 The SNPObject to merge samples with. 1121 force_samples (bool, default=False): 1122 If True, duplicate sample names are resolved by prepending the `prefix` to duplicate sample names in 1123 `snpobj`. Otherwise, merging fails when duplicate sample names are found. Default is False. 1124 prefix (str, default='2'): 1125 A string prepended to duplicate sample names in `snpobj` when `force_samples=True`. 1126 Duplicates are renamed from `<sample_name>` to `<prefix>:<sample_name>`. For instance, 1127 if `prefix='2'` and there is a conflict with a sample called "sample_1", it becomes "2:sample_1". 1128 inplace (bool, default=False): 1129 If True, modifies `self` in place. If False, returns a new `SNPObject` with the merged samples. 1130 Default is False. 1131 1132 Returns: 1133 **Optional[SNPObject]**: A new SNPObject containing the merged sample data. 1134 """ 1135 # Merge calldata_gt if present and compatible 1136 if self.calldata_gt is not None and snpobj.calldata_gt is not None: 1137 if self.calldata_gt.shape[0] != snpobj.calldata_gt.shape[0]: 1138 raise ValueError( 1139 f"Cannot merge SNPObjects: Mismatch in the number of SNPs in `calldata_gt`.\n" 1140 f"`self.calldata_gt` has {self.calldata_gt.shape[0]} SNPs, " 1141 f"while `snpobj.calldata_gt` has {snpobj.calldata_gt.shape[0]} SNPs." 1142 ) 1143 if self.are_strands_summed and not snpobj.are_strands_summed: 1144 raise ValueError( 1145 "Cannot merge SNPObjects: `self` has summed strands, but `snpobj` does not.\n" 1146 "Ensure both objects have the same genotype summation state before merging." 1147 ) 1148 if not self.are_strands_summed and snpobj.are_strands_summed: 1149 raise ValueError( 1150 "Cannot merge SNPObjects: `snpobj` has summed strands, but `self` does not.\n" 1151 "Ensure both objects have the same genotype summation state before merging." 1152 ) 1153 calldata_gt = np.concatenate([self.calldata_gt, snpobj.calldata_gt], axis=1) 1154 else: 1155 calldata_gt = None 1156 1157 # Merge samples if present and compatible, handling duplicates if `force_samples=True` 1158 if self.samples is not None and snpobj.samples is not None: 1159 overlapping_samples = set(self.samples).intersection(set(snpobj.samples)) 1160 if overlapping_samples: 1161 if not force_samples: 1162 raise ValueError( 1163 f"Cannot merge SNPObjects: Found overlapping sample names {overlapping_samples}.\n" 1164 "Samples must be strictly non-overlapping. To allow merging with renaming, set `force_samples=True`." 1165 ) 1166 else: 1167 # Rename duplicate samples by prepending the file index 1168 renamed_samples = [f"{prefix}:{sample}" if sample in overlapping_samples else sample for sample in snpobj.samples] 1169 samples = np.concatenate([self.samples, renamed_samples], axis=0) 1170 else: 1171 samples = np.concatenate([self.samples, snpobj.samples], axis=0) 1172 else: 1173 samples = None 1174 1175 # Merge LAI data if present and compatible 1176 if self.calldata_lai is not None and snpobj.calldata_lai is not None: 1177 if self.calldata_lai.ndim != snpobj.calldata_lai.ndim: 1178 raise ValueError( 1179 f"Cannot merge SNPObjects: Mismatch in `calldata_lai` dimensions.\n" 1180 f"`self.calldata_lai` has {self.calldata_lai.ndim} dimensions, " 1181 f"while `snpobj.calldata_lai` has {snpobj.calldata_lai.ndim} dimensions." 1182 ) 1183 if self.calldata_lai.shape[0] != snpobj.calldata_lai.shape[0]: 1184 raise ValueError( 1185 f"Cannot merge SNPObjects: Mismatch in the number of SNPs in `calldata_lai`.\n" 1186 f"`self.calldata_lai` has {self.calldata_lai.shape[0]} SNPs, " 1187 f"while `snpobj.calldata_lai` has {snpobj.calldata_lai.shape[0]} SNPs." 1188 ) 1189 calldata_lai = np.concatenate([self.calldata_lai, snpobj.calldata_lai], axis=1) 1190 else: 1191 calldata_lai = None 1192 1193 if inplace: 1194 self.calldata_gt = calldata_gt 1195 self.calldata_lai = calldata_lai 1196 self.samples = samples 1197 return self 1198 1199 # Create and return a new SNPObject containing the merged samples 1200 return SNPObject( 1201 calldata_gt=calldata_gt, 1202 samples=samples, 1203 variants_ref=self.variants_ref, 1204 variants_alt=self.variants_alt, 1205 variants_chrom=self.variants_chrom, 1206 variants_filter_pass=self.variants_filter_pass, 1207 variants_id=self.variants_id, 1208 variants_pos=self.variants_pos, 1209 variants_qual=self.variants_qual, 1210 calldata_lai=calldata_lai, 1211 ancestry_map=self.ancestry_map 1212 ) 1213 1214 def concat( 1215 self, 1216 snpobj: 'SNPObject', 1217 inplace: bool = False 1218 ) -> Optional['SNPObject']: 1219 """ 1220 Concatenate self with snpobj along the SNP axis. 1221 1222 This method expects both SNPObjects to contain the same set of samples in the same order, 1223 and that the chromosome(s) in snpobj follow (i.e. have higher numeric identifiers than) 1224 those in self. 1225 1226 Args: 1227 snpobj (SNPObject): 1228 The SNPObject to concatenate SNPs with. 1229 inplace (bool, default=False): 1230 If True, modifies `self` in place. If False, returns a new `SNPObject` with the concatenated SNPs. 1231 Default is False. 1232 1233 Returns: 1234 **Optional[SNPObject]**: A new SNPObject containing the concatenated SNP data. 1235 """ 1236 # Merge calldata_gt if present and compatible 1237 if self.calldata_gt is not None and snpobj.calldata_gt is not None: 1238 if self.calldata_gt.shape[1] != snpobj.calldata_gt.shape[1]: 1239 raise ValueError( 1240 f"Cannot merge SNPObjects: Mismatch in the number of samples in `calldata_gt`.\n" 1241 f"`self.calldata_gt` has {self.calldata_gt.shape[1]} samples, " 1242 f"while `snpobj.calldata_gt` has {snpobj.calldata_gt.shape[1]} samples." 1243 ) 1244 if self.are_strands_summed and not snpobj.are_strands_summed: 1245 raise ValueError( 1246 "Cannot merge SNPObjects: `self` has summed strands, but `snpobj` does not.\n" 1247 "Ensure both objects have the same genotype summation state before merging." 1248 ) 1249 if not self.are_strands_summed and snpobj.are_strands_summed: 1250 raise ValueError( 1251 "Cannot merge SNPObjects: `snpobj` has summed strands, but `self` does not.\n" 1252 "Ensure both objects have the same genotype summation state before merging." 1253 ) 1254 calldata_gt = np.concatenate([self.calldata_gt, snpobj.calldata_gt], axis=0) 1255 else: 1256 calldata_gt = None 1257 1258 # Merge SNP-related attributes if present 1259 attributes = [ 1260 'variants_ref', 'variants_alt', 'variants_chrom', 'variants_filter_pass', 'variants_id', 'variants_pos', 'variants_qual' 1261 ] 1262 merged_attrs = {} 1263 for attr in attributes: 1264 self_attr = getattr(self, attr, None) 1265 obj_attr = getattr(snpobj, attr, None) 1266 1267 # Concatenate if both present 1268 if self_attr is not None and obj_attr is not None: 1269 merged_attrs[attr] = np.concatenate([self_attr, obj_attr], axis=0) 1270 else: 1271 # If either is None, store None 1272 merged_attrs[attr] = None 1273 1274 # Merge LAI data if present and compatible 1275 if self.calldata_lai is not None and snpobj.calldata_lai is not None: 1276 if self.calldata_lai.ndim != snpobj.calldata_lai.ndim: 1277 raise ValueError( 1278 f"Cannot merge SNPObjects: Mismatch in `calldata_lai` dimensions.\n" 1279 f"`self.calldata_lai` has {self.calldata_lai.ndim} dimensions, " 1280 f"while `snpobj.calldata_lai` has {snpobj.calldata_lai.ndim} dimensions." 1281 ) 1282 if self.calldata_lai.shape[1] != snpobj.calldata_lai.shape[1]: 1283 raise ValueError( 1284 f"Cannot merge SNPObjects: Mismatch in the number of samples in `calldata_lai`.\n" 1285 f"`self.calldata_lai` has {self.calldata_lai.shape[1]} samples, " 1286 f"while `snpobj.calldata_lai` has {snpobj.calldata_lai.shape[1]} samples." 1287 ) 1288 calldata_lai = np.concatenate([self.calldata_lai, snpobj.calldata_lai], axis=0) 1289 else: 1290 calldata_lai = None 1291 1292 if inplace: 1293 self.calldata_gt = calldata_gt 1294 self.calldata_lai = calldata_lai 1295 for attr in attributes: 1296 self[attr] = merged_attrs[attr] 1297 return self 1298 1299 # Create and return a new SNPObject containing the concatenated SNPs 1300 return SNPObject( 1301 calldata_gt=calldata_gt, 1302 calldata_lai=calldata_lai, 1303 samples=self.samples, 1304 variants_ref=merged_attrs['variants_ref'], 1305 variants_alt=merged_attrs['variants_alt'], 1306 variants_chrom=merged_attrs['variants_chrom'], 1307 variants_id=merged_attrs['variants_id'], 1308 variants_pos=merged_attrs['variants_pos'], 1309 variants_qual=merged_attrs['variants_qual'], 1310 variants_filter_pass=merged_attrs['variants_filter_pass'], 1311 ancestry_map=self.ancestry_map 1312 ) 1313 1314 def remove_strand_ambiguous_variants(self, inplace: bool = False) -> Optional['SNPObject']: 1315 """ 1316 A strand-ambiguous variant has reference (`variants_ref`) and alternate (`variants_alt`) alleles 1317 in the pairs A/T, T/A, C/G, or G/C, where both alleles are complementary and thus indistinguishable 1318 in terms of strand orientation. 1319 1320 Args: 1321 inplace (bool, default=False): 1322 If True, modifies `self` in place. If False, returns a new `SNPObject` with the 1323 strand-ambiguous variants removed. Default is False. 1324 1325 Returns: 1326 **Optional[SNPObject]:** A new `SNPObject` with non-ambiguous variants only if `inplace=False`. 1327 If `inplace=True`, modifies `self` in place and returns None. 1328 """ 1329 # Identify strand-ambiguous SNPs using vectorized comparisons 1330 is_AT = (self['variants_ref'] == 'A') & (self['variants_alt'] == 'T') 1331 is_TA = (self['variants_ref'] == 'T') & (self['variants_alt'] == 'A') 1332 is_CG = (self['variants_ref'] == 'C') & (self['variants_alt'] == 'G') 1333 is_GC = (self['variants_ref'] == 'G') & (self['variants_alt'] == 'C') 1334 1335 # Create a combined mask for all ambiguous variants 1336 ambiguous_mask = is_AT | is_TA | is_CG | is_GC 1337 non_ambiguous_idx = np.where(~ambiguous_mask)[0] 1338 1339 # Count each type of ambiguity using numpy's sum on boolean arrays 1340 A_T_count = np.sum(is_AT) 1341 T_A_count = np.sum(is_TA) 1342 C_G_count = np.sum(is_CG) 1343 G_C_count = np.sum(is_GC) 1344 1345 # Log the counts of each type of strand-ambiguous variants 1346 total_ambiguous = A_T_count + T_A_count + C_G_count + G_C_count 1347 log.info(f'{A_T_count} ambiguities of A-T type.') 1348 log.info(f'{T_A_count} ambiguities of T-A type.') 1349 log.info(f'{C_G_count} ambiguities of C-G type.') 1350 log.info(f'{G_C_count} ambiguities of G-C type.') 1351 1352 # Filter out ambiguous variants and keep non-ambiguous ones 1353 log.debug(f'Removing {total_ambiguous} strand-ambiguous variants...') 1354 return self.filter_variants(indexes=non_ambiguous_idx, include=True, inplace=inplace) 1355 1356 def correct_flipped_variants( 1357 self, 1358 snpobj: 'SNPObject', 1359 check_complement: bool = True, 1360 index_by: str = 'pos', 1361 common_variants_intersection: Optional[Tuple[np.ndarray, np.ndarray]] = None, 1362 log_stats: bool = True, 1363 inplace: bool = False 1364 ) -> Optional['SNPObject']: 1365 """ 1366 Correct flipped variants between between `self` and a reference `snpobj`, where reference (`variants_ref`) 1367 and alternate (`variants_alt`) alleles are swapped. 1368 1369 **Flip Detection Based on `check_complement`:** 1370 1371 - If `check_complement=False`, only direct allele swaps are considered: 1372 1. **Direct Swap:** `self.variants_ref == snpobj.variants_alt` and `self.variants_alt == snpobj.variants_ref`. 1373 1374 - If `check_complement=True`, both direct and complementary swaps are considered, with four possible cases: 1375 1. **Direct Swap:** `self.variants_ref == snpobj.variants_alt` and `self.variants_alt == snpobj.variants_ref`. 1376 2. **Complement Swap of Ref:** `complement(self.variants_ref) == snpobj.variants_alt` and `self.variants_alt == snpobj.variants_ref`. 1377 3. **Complement Swap of Alt:** `self.variants_ref == snpobj.variants_alt` and `complement(self.variants_alt) == snpobj.variants_ref`. 1378 4. **Complement Swap of both Ref and Alt:** `complement(self.variants_ref) == snpobj.variants_alt` and `complement(self.variants_alt) == snpobj.variants_ref`. 1379 1380 **Note:** Variants where `self.variants_ref == self.variants_alt` are ignored as they are ambiguous. 1381 1382 **Correction Process:** 1383 - Swaps `variants_ref` and `variants_alt` alleles in `self` to align with `snpobj`. 1384 - Flips `calldata_gt` values (0 becomes 1, and 1 becomes 0) to match the updated allele configuration. 1385 1386 Args: 1387 snpobj (SNPObject): 1388 The reference SNPObject to compare against. 1389 check_complement (bool, default=True): 1390 If True, also checks for complementary base pairs (A/T, T/A, C/G, and G/C) when identifying swapped variants. 1391 Default is True. 1392 index_by (str, default='pos'): 1393 Criteria for matching variants. Options: 1394 - `'pos'`: Matches by chromosome and position (`variants_chrom`, `variants_pos`), e.g., 'chr1-12345'. 1395 - `'id'`: Matches by variant ID alone (`variants_id`), e.g., 'rs123'. 1396 - `'pos+id'`: Matches by chromosome, position, and ID (`variants_chrom`, `variants_pos`, `variants_id`), e.g., 'chr1-12345-rs123'. 1397 Default is 'pos'. 1398 common_variants_intersection (tuple of arrays, optional): 1399 Precomputed indices of common variants between `self` and `snpobj`. If None, intersection is 1400 computed within the function. 1401 log_stats (bool, default=True): 1402 If True, logs statistical information about matching and ambiguous alleles. Default is True. 1403 inplace (bool, default=False): 1404 If True, modifies `self` in place. If False, returns a new `SNPObject` with corrected 1405 flips. Default is False. 1406 1407 Returns: 1408 **Optional[SNPObject]**: 1409 A new `SNPObject` with corrected flips if `inplace=False`. 1410 If `inplace=True`, modifies `self` in place and returns None. 1411 """ 1412 # Define complement mappings for nucleotides 1413 complement_map = {'A': 'T', 'T': 'A', 'C': 'G', 'G': 'C'} 1414 1415 # Helper function to get the complement of a base 1416 def get_complement(base: str) -> str: 1417 return complement_map.get(base, base) 1418 1419 # Get common variant indices if not provided 1420 if common_variants_intersection != None: 1421 query_idx, reference_idx = common_variants_intersection 1422 else: 1423 _, query_idx, reference_idx = self.get_common_variants_intersection(snpobj, index_by=index_by) 1424 1425 # Log statistics on matching alleles if enabled 1426 if log_stats: 1427 matching_ref = np.sum(self['variants_ref'][query_idx] == snpobj['variants_ref'][reference_idx]) 1428 matching_alt = np.sum(self['variants_alt'][query_idx] == snpobj['variants_alt'][reference_idx]) 1429 ambiguous = np.sum(self['variants_ref'][query_idx] == self['variants_alt'][query_idx]) 1430 log.info(f"Matching reference alleles (ref=ref'): {matching_ref}, Matching alternate alleles (alt=alt'): {matching_alt}.") 1431 log.info(f"Number of ambiguous alleles (ref=alt): {ambiguous}.") 1432 1433 # Identify indices where `ref` and `alt` alleles are swapped 1434 if not check_complement: 1435 # Simple exact match for swapped alleles 1436 swapped_ref = (self['variants_ref'][query_idx] == snpobj['variants_alt'][reference_idx]) 1437 swapped_alt = (self['variants_alt'][query_idx] == snpobj['variants_ref'][reference_idx]) 1438 else: 1439 # Check for swapped or complementary-swapped alleles 1440 swapped_ref = ( 1441 (self['variants_ref'][query_idx] == snpobj['variants_alt'][reference_idx]) | 1442 (np.vectorize(get_complement)(self['variants_ref'][query_idx]) == snpobj['variants_alt'][reference_idx]) 1443 ) 1444 swapped_alt = ( 1445 (self['variants_alt'][query_idx] == snpobj['variants_ref'][reference_idx]) | 1446 (np.vectorize(get_complement)(self['variants_alt'][query_idx]) == snpobj['variants_ref'][reference_idx]) 1447 ) 1448 1449 # Filter out ambiguous variants where `ref` and `alt` alleles match (ref=alt) 1450 not_ambiguous = (self['variants_ref'][query_idx] != self['variants_alt'][query_idx]) 1451 1452 # Indices in `self` of flipped variants 1453 flip_idx_query = query_idx[swapped_ref & swapped_alt & not_ambiguous] 1454 1455 # Correct the identified variant flips 1456 if len(flip_idx_query) > 0: 1457 log.info(f'Correcting {len(flip_idx_query)} variant flips...') 1458 1459 temp_alts = self['variants_alt'][flip_idx_query] 1460 temp_refs = self['variants_ref'][flip_idx_query] 1461 1462 # Correct the variant flips based on whether the operation is in-place or not 1463 if inplace: 1464 self['variants_alt'][flip_idx_query] = temp_refs 1465 self['variants_ref'][flip_idx_query] = temp_alts 1466 self['calldata_gt'][flip_idx_query] = 1 - self['calldata_gt'][flip_idx_query] 1467 return None 1468 else: 1469 snpobj = self.copy() 1470 snpobj['variants_alt'][flip_idx_query] = temp_refs 1471 snpobj['variants_ref'][flip_idx_query] = temp_alts 1472 snpobj['calldata_gt'][flip_idx_query] = 1 - snpobj['calldata_gt'][flip_idx_query] 1473 return snpobj 1474 else: 1475 log.info('No variant flips found to correct.') 1476 return self if not inplace else None 1477 1478 def remove_mismatching_variants( 1479 self, 1480 snpobj: 'SNPObject', 1481 index_by: str = 'pos', 1482 common_variants_intersection: Optional[Tuple[np.ndarray, np.ndarray]] = None, 1483 inplace: bool = False 1484 ) -> Optional['SNPObject']: 1485 """ 1486 Remove variants from `self`, where reference (`variants_ref`) and/or alternate (`variants_alt`) alleles 1487 do not match with a reference `snpobj`. 1488 1489 Args: 1490 snpobj (SNPObject): 1491 The reference SNPObject to compare against. 1492 index_by (str, default='pos'): 1493 Criteria for matching variants. Options: 1494 - `'pos'`: Matches by chromosome and position (`variants_chrom`, `variants_pos`), e.g., 'chr1-12345'. 1495 - `'id'`: Matches by variant ID alone (`variants_id`), e.g., 'rs123'. 1496 - `'pos+id'`: Matches by chromosome, position, and ID (`variants_chrom`, `variants_pos`, `variants_id`), e.g., 'chr1-12345-rs123'. 1497 Default is 'pos'. 1498 common_variants_intersection (tuple of arrays, optional): 1499 Precomputed indices of common variants between `self` and the reference `snpobj`. 1500 If None, the intersection is computed within the function. 1501 inplace (bool, default=False): 1502 If True, modifies `self` in place. If False, returns a new `SNPObject` without 1503 mismatching variants. Default is False. 1504 1505 Returns: 1506 **Optional[SNPObject]:** 1507 A new `SNPObject` without mismatching variants if `inplace=False`. 1508 If `inplace=True`, modifies `self` in place and returns None. 1509 """ 1510 # Get common variant indices if not provided 1511 if common_variants_intersection is not None: 1512 query_idx, reference_idx = common_variants_intersection 1513 else: 1514 _, query_idx, reference_idx = self.get_common_variants_intersection(snpobj, index_by=index_by) 1515 1516 # Vectorized comparison of `ref` and `alt` alleles 1517 ref_mismatch = self['variants_ref'][query_idx] != snpobj['variants_ref'][reference_idx] 1518 alt_mismatch = self['variants_alt'][query_idx] != snpobj['variants_alt'][reference_idx] 1519 mismatch_mask = ref_mismatch | alt_mismatch 1520 1521 # Identify indices in `self` of mismatching variants 1522 mismatch_idx = query_idx[mismatch_mask] 1523 1524 # Compute total number of variant mismatches 1525 total_mismatches = np.sum(mismatch_mask) 1526 1527 # Filter out mismatching variants 1528 log.debug(f'Removing {total_mismatches} mismatching variants...') 1529 return self.filter_variants(indexes=mismatch_idx, include=True, inplace=inplace) 1530 1531 def shuffle_variants(self, inplace: bool = False) -> Optional['SNPObject']: 1532 """ 1533 Randomly shuffle the positions of variants in the SNPObject, ensuring that all associated 1534 data (e.g., `calldata_gt` and variant-specific attributes) remain aligned. 1535 1536 Args: 1537 inplace (bool, default=False): 1538 If True, modifies `self` in place. If False, returns a new `SNPObject` with 1539 shuffled variants. Default is False. 1540 1541 Returns: 1542 **Optional[SNPObject]:** 1543 A new `SNPObject` without shuffled variant positions if `inplace=False`. 1544 If `inplace=True`, modifies `self` in place and returns None. 1545 """ 1546 # Generate a random permutation index for shuffling variant positions 1547 shuffle_index = np.random.permutation(self.n_snps) 1548 1549 # Apply shuffling to all relevant attributes using the class's dictionary-like interface 1550 if inplace: 1551 for key in self.keys(): 1552 if self[key] is not None: 1553 if key == 'calldata_gt': 1554 # `calldata_gt`` has a different shape, so it's shuffled along axis 0 1555 self[key] = self[key][shuffle_index, ...] 1556 elif 'variant' in key: 1557 # snpobj attributes are 1D arrays 1558 self[key] = np.asarray(self[key])[shuffle_index] 1559 return None 1560 else: 1561 shuffled_snpobj = self.copy() 1562 for key in shuffled_snpobj.keys(): 1563 if shuffled_snpobj[key] is not None: 1564 if key == 'calldata_gt': 1565 shuffled_snpobj[key] = shuffled_snpobj[key][shuffle_index, ...] 1566 elif 'variant' in key: 1567 shuffled_snpobj[key] = np.asarray(shuffled_snpobj[key])[shuffle_index] 1568 return shuffled_snpobj 1569 1570 def set_empty_to_missing(self, inplace: bool = False) -> Optional['SNPObject']: 1571 """ 1572 Replace empty strings `''` with missing values `'.'` in attributes of `self`. 1573 1574 Args: 1575 inplace (bool, default=False): 1576 If True, modifies `self` in place. If False, returns a new `SNPObject` with empty 1577 strings `''` replaced by missing values `'.'`. Default is False. 1578 1579 Returns: 1580 **Optional[SNPObject]:** 1581 A new `SNPObject` with empty strings replaced if `inplace=False`. 1582 If `inplace=True`, modifies `self` in place and returns None. 1583 """ 1584 if inplace: 1585 if self.variants_alt is not None: 1586 self.variants_alt[self.variants_alt == ''] = '.' 1587 if self.variants_ref is not None: 1588 self.variants_ref[self.variants_ref == ''] = '.' 1589 if self.variants_qual is not None: 1590 self.variants_qual = self.variants_qual.astype(str) 1591 self.variants_qual[(self.variants_qual == '') | (self.variants_qual == 'nan')] = '.' 1592 if self.variants_chrom is not None: 1593 self.variants_chrom = self.variants_chrom.astype(str) 1594 self.variants_chrom[self.variants_chrom == ''] = '.' 1595 if self.variants_filter_pass is not None: 1596 self.variants_filter_pass[self.variants_filter_pass == ''] = '.' 1597 if self.variants_id is not None: 1598 self.variants_id[self.variants_id == ''] = '.' 1599 return self 1600 else: 1601 snpobj = self.copy() 1602 if snpobj.variants_alt is not None: 1603 snpobj.variants_alt[snpobj.variants_alt == ''] = '.' 1604 if snpobj.variants_ref is not None: 1605 snpobj.variants_ref[snpobj.variants_ref == ''] = '.' 1606 if snpobj.variants_qual is not None: 1607 snpobj.variants_qual = snpobj.variants_qual.astype(str) 1608 snpobj.variants_qual[(snpobj.variants_qual == '') | (snpobj.variants_qual == 'nan')] = '.' 1609 if snpobj.variants_chrom is not None: 1610 snpobj.variants_chrom[snpobj.variants_chrom == ''] = '.' 1611 if snpobj.variants_filter_pass is not None: 1612 snpobj.variants_filter_pass[snpobj.variants_filter_pass == ''] = '.' 1613 if snpobj.variants_id is not None: 1614 snpobj.variants_id[snpobj.variants_id == ''] = '.' 1615 return snpobj 1616 1617 def convert_to_window_level( 1618 self, 1619 window_size: Optional[int] = None, 1620 physical_pos: Optional[np.ndarray] = None, 1621 chromosomes: Optional[np.ndarray] = None, 1622 window_sizes: Optional[np.ndarray] = None, 1623 laiobj: Optional['LocalAncestryObject'] = None 1624 ) -> 'LocalAncestryObject': 1625 """ 1626 Aggregate the `calldata_lai` attribute into genomic windows within a 1627 `snputils.ancestry.genobj.LocalAncestryObject`. 1628 1629 **Options for defining windows (in order of precedence):** 1630 1631 1. **Fixed window size**: 1632 - Use `window_size` to specify how many SNPs go into each window. The last window on each 1633 chromosome may be larger if SNPs are not evenly divisible by the size. 1634 1635 2. **Custom start and end positions**: 1636 - Provide `physical_pos` (2D array of shape (n_windows, 2)) as the [start, end] base-pair 1637 coordinates for each window. 1638 - If `chromosomes` is not provided and `self` has exactly one chromosome, all windows are 1639 assumed to belong to that chromosome. 1640 - If multiple chromosomes exist but `chromosomes` is missing, an error will be raised. 1641 - Optionally, provide `window_sizes` to store the SNP count per-window. 1642 1643 3. **Matching existing windows**: 1644 - Reuse window definitions (`physical_pos`, `chromosomes`, `window_sizes`) from an existing `laiobj`. 1645 1646 Args: 1647 window_size (int, optional): 1648 Number of SNPs in each window if defining fixed-size windows. If the total number of 1649 SNPs in a chromosome is not evenly divisible by the window size, the last window on that 1650 chromosome will include all remaining SNPs and therefore be larger than the specified size. 1651 physical_pos (array of shape (n_windows, 2), optional): 1652 A 2D array containing the start and end physical positions for each window. 1653 chromosomes (array of shape (n_windows,), optional): 1654 An array with chromosome numbers corresponding to each genomic window. 1655 window_sizes (array of shape (n_windows,), optional): 1656 An array specifying the number of SNPs in each genomic window. 1657 laiobj (LocalAncestryObject, optional): 1658 A reference `LocalAncestryObject` from which to copy existing window definitions. 1659 1660 Returns: 1661 **LocalAncestryObject:** 1662 A LocalAncestryObject containing window-level ancestry data. 1663 """ 1664 from snputils.ancestry.genobj.local import LocalAncestryObject 1665 1666 if window_size is None and physical_pos is None and laiobj is None: 1667 raise ValueError("One of `window_size`, `physical_pos`, or `laiobj` must be provided.") 1668 1669 # Fixed window size 1670 if window_size is not None: 1671 physical_pos = [] # Boundaries [start, end] of each window 1672 chromosomes = [] # Chromosome for each window 1673 window_sizes = [] # Number of SNPs for each window 1674 for chrom in self.unique_chrom: 1675 # Extract indices corresponding to this chromosome 1676 mask_chrom = (self.variants_chrom == chrom) 1677 # Subset to this chromosome 1678 pos_chrom = self.variants_pos[mask_chrom] 1679 # Number of SNPs for this chromosome 1680 n_snps_chrom = pos_chrom.size 1681 1682 # Initialize the start of the first window with the position of the first SNP 1683 current_start = self.variants_pos[0] 1684 1685 # Number of full windows with exactly `window_size` SNPs 1686 n_full_windows = n_snps_chrom // window_size 1687 1688 # Build all but the last window 1689 for i in range(n_full_windows-1): 1690 current_end = self.variants_pos[(i+1) * window_size - 1] 1691 physical_pos.append([current_start, current_end]) 1692 chromosomes.append(chrom) 1693 window_sizes.append(window_size) 1694 current_start = self.variants_pos[(i+1) * window_size] 1695 1696 # Build the last window 1697 current_end = self.variants_pos[-1] 1698 physical_pos.append([current_start, current_end]) 1699 chromosomes.append(chrom) 1700 window_sizes.append(n_snps_chrom - ((n_full_windows - 1) * window_size)) 1701 1702 physical_pos = np.array(physical_pos) 1703 chromosomes = np.array(chromosomes) 1704 window_sizes = np.array(window_sizes) 1705 1706 # Custom start and end positions 1707 elif physical_pos is not None: 1708 # Check if there is exactly one chromosome 1709 if chromosomes is None: 1710 unique_chrom = self.unique_chrom 1711 if len(unique_chrom) == 1: 1712 # We assume all windows belong to this single chromosome 1713 single_chrom = unique_chrom[0] 1714 chromosomes = np.array([single_chrom] * physical_pos.shape[0]) 1715 else: 1716 raise ValueError("Multiple chromosomes detected, but `chromosomes` was not provided.") 1717 1718 # Match existing windows to a reference laiobj 1719 elif laiobj is not None: 1720 physical_pos = laiobj.physical_pos 1721 chromosomes = laiobj.chromosomes 1722 window_sizes = laiobj.window_sizes 1723 1724 # Allocate an output LAI array 1725 n_windows = physical_pos.shape[0] 1726 n_samples = self.n_samples 1727 if self.calldata_lai.ndim == 3: 1728 lai = np.zeros((n_windows, n_samples, 2)) 1729 else: 1730 lai = np.zeros((n_windows, n_samples*2)) 1731 1732 # For each window, find the relevant SNPs and compute the mode of the ancestries 1733 for i, ((start, end), chrom) in enumerate(zip(physical_pos, chromosomes)): 1734 snps_mask = ( 1735 (self.variants_chrom == chrom) & 1736 (self.variants_pos >= start) & 1737 (self.variants_pos <= end) 1738 ) 1739 if np.any(snps_mask): 1740 lai_mask = self.calldata_lai[snps_mask, ...] 1741 mode_ancestries = mode(lai_mask, axis=0, nan_policy='omit').mode 1742 lai[i] = mode_ancestries 1743 else: 1744 lai[i] = np.nan 1745 1746 # Generate haplotype labels, e.g. "Sample1.0", "Sample1.1" 1747 haplotypes = [f"{sample}.{i}" for sample in self.samples for i in range(2)] 1748 1749 # If original data was (n_snps, n_samples, 2), flatten to (n_windows, n_samples*2) 1750 if self.calldata_lai.ndim == 3: 1751 lai = lai.reshape(n_windows, -1) 1752 1753 # Aggregate into a LocalAncestryObject 1754 return LocalAncestryObject( 1755 haplotypes=haplotypes, 1756 lai=lai, 1757 samples=self.samples, 1758 ancestry_map=self.ancestry_map, 1759 window_sizes=window_sizes, 1760 physical_pos=physical_pos, 1761 chromosomes=chromosomes 1762 ) 1763 1764 def save(self, file: Union[str, Path]) -> None: 1765 """ 1766 Save the data stored in `self` to a specified file. 1767 1768 The format of the saved file is determined by the file extension provided in the `file` 1769 argument. 1770 1771 **Supported formats:** 1772 1773 - `.bed`: Binary PED (Plink) format. 1774 - `.pgen`: Plink2 binary genotype format. 1775 - `.vcf`: Variant Call Format. 1776 - `.pkl`: Pickle format for saving `self` in serialized form. 1777 1778 Args: 1779 file (str or pathlib.Path): 1780 Path to the file where the data will be saved. The extension of the file determines the save format. 1781 Supported extensions: `.bed`, `.pgen`, `.vcf`, `.pkl`. 1782 """ 1783 ext = Path(file).suffix.lower() 1784 if ext == '.bed': 1785 self.save_bed(file) 1786 elif ext == '.pgen': 1787 self.save_pgen(file) 1788 elif ext == '.vcf': 1789 self.save_vcf(file) 1790 elif ext == '.pkl': 1791 self.save_pickle(file) 1792 else: 1793 raise ValueError(f"Unsupported file extension: {ext}") 1794 1795 def save_bed(self, file: Union[str, Path]) -> None: 1796 """ 1797 Save the data stored in `self` to a `.bed` file. 1798 1799 Args: 1800 file (str or pathlib.Path): 1801 Path to the file where the data will be saved. It should end with `.bed`. 1802 If the provided path does not have this extension, it will be appended. 1803 """ 1804 from snputils.snp.io.write.bed import BEDWriter 1805 writer = BEDWriter(snpobj=self, filename=file) 1806 writer.write() 1807 1808 def save_pgen(self, file: Union[str, Path]) -> None: 1809 """ 1810 Save the data stored in `self` to a `.pgen` file. 1811 1812 Args: 1813 file (str or pathlib.Path): 1814 Path to the file where the data will be saved. It should end with `.pgen`. 1815 If the provided path does not have this extension, it will be appended. 1816 """ 1817 from snputils.snp.io.write.pgen import PGENWriter 1818 writer = PGENWriter(snpobj=self, filename=file) 1819 writer.write() 1820 1821 def save_vcf(self, file: Union[str, Path]) -> None: 1822 """ 1823 Save the data stored in `self` to a `.vcf` file. 1824 1825 Args: 1826 file (str or pathlib.Path): 1827 Path to the file where the data will be saved. It should end with `.vcf`. 1828 If the provided path does not have this extension, it will be appended. 1829 """ 1830 from snputils.snp.io.write.vcf import VCFWriter 1831 writer = VCFWriter(snpobj=self, filename=file) 1832 writer.write() 1833 1834 def save_pickle(self, file: Union[str, Path]) -> None: 1835 """ 1836 Save `self` in serialized form to a `.pkl` file. 1837 1838 Args: 1839 file (str or pathlib.Path): 1840 Path to the file where the data will be saved. It should end with `.pkl`. 1841 If the provided path does not have this extension, it will be appended. 1842 """ 1843 import pickle 1844 with open(file, 'wb') as file: 1845 pickle.dump(self, file) 1846 1847 @staticmethod 1848 def _match_to_replace(val: Union[str, int, float], dictionary: Dict[Any, Any], regex: bool = True) -> Union[str, int, float]: 1849 """ 1850 Find a matching key in the provided dictionary for the given value `val` 1851 and replace it with the corresponding value. 1852 1853 Args: 1854 val (str, int, or float): 1855 The value to be matched and potentially replaced. 1856 dictionary (Dict): 1857 A dictionary containing keys and values for matching and replacement. 1858 The keys should match the data type of `val`. 1859 regex (bool): 1860 If True, interprets keys in `dictionary` as regular expressions. 1861 Default is True. 1862 1863 Returns: 1864 str, int, or float: 1865 The replacement value from `dictionary` if a match is found; otherwise, the original `val`. 1866 """ 1867 if regex: 1868 # Use regular expression matching to find replacements 1869 for key, value in dictionary.items(): 1870 if isinstance(key, str): 1871 match = re.match(key, val) 1872 if match: 1873 # Replace using the first matching regex pattern 1874 return re.sub(key, value, val) 1875 # Return the original value if no regex match is found 1876 return val 1877 else: 1878 # Return the value for `val` if present in `dictionary`; otherwise, return `val` 1879 return dictionary.get(val, val) 1880 1881 @staticmethod 1882 def _get_chromosome_number(chrom_string: str) -> Union[int, str]: 1883 """ 1884 Extracts the chromosome number from the given chromosome string. 1885 1886 Args: 1887 chrom_string (str): 1888 The chromosome identifier. 1889 1890 Returns: 1891 int or str: 1892 The numeric representation of the chromosome if detected. 1893 Returns 10001 for 'X' or 'chrX', 10002 for 'Y' or 'chrY', 1894 and the original `chrom_string` if unrecognized. 1895 """ 1896 if chrom_string.isdigit(): 1897 return int(chrom_string) 1898 else: 1899 chrom_num = re.search(r'\d+', chrom_string) 1900 if chrom_num: 1901 return int(chrom_num.group()) 1902 elif chrom_string.lower() in ['x', 'chrx']: 1903 return 10001 1904 elif chrom_string.lower() in ['y', 'chry']: 1905 return 10002 1906 else: 1907 log.warning(f"Chromosome nomenclature not standard. Chromosome: {chrom_string}") 1908 return chrom_string 1909 1910 def _sanity_check(self) -> None: 1911 """ 1912 Perform sanity checks to ensure LAI and ancestry map consistency. 1913 1914 This method checks that all unique ancestries in the LAI data are represented 1915 in the ancestry map if it is provided. 1916 """ 1917 if self.__calldata_lai is not None and self.__ancestry_map is not None: 1918 unique_ancestries = np.unique(self.__calldata_lai) 1919 missing_ancestries = [anc for anc in unique_ancestries if str(anc) not in self.__ancestry_map] 1920 if missing_ancestries: 1921 warnings.warn(f"Missing ancestries in ancestry_map: {missing_ancestries}")
A class for Single Nucleotide Polymorphism (SNP) data, with optional support for SNP-level Local Ancestry Information (LAI).
22 def __init__( 23 self, 24 calldata_gt: Optional[np.ndarray] = None, 25 samples: Optional[np.ndarray] = None, 26 variants_ref: Optional[np.ndarray] = None, 27 variants_alt: Optional[np.ndarray] = None, 28 variants_chrom: Optional[np.ndarray] = None, 29 variants_filter_pass: Optional[np.ndarray] = None, 30 variants_id: Optional[np.ndarray] = None, 31 variants_pos: Optional[np.ndarray] = None, 32 variants_qual: Optional[np.ndarray] = None, 33 calldata_lai: Optional[np.ndarray] = None, 34 ancestry_map: Optional[Dict[str, str]] = None 35 ) -> None: 36 """ 37 Args: 38 calldata_gt (array, optional): 39 An array containing genotype data for each sample. This array can be either 2D with shape 40 `(n_snps, n_samples)` if the paternal and maternal strands are summed, or 3D with shape 41 `(n_snps, n_samples, 2)` if the strands are kept separate. 42 samples (array of shape (n_sampels,), optional): 43 An array containing unique sample identifiers. 44 variants_ref (array of shape (n_snps,), optional): 45 An array containing the reference allele for each SNP. 46 variants_alt (array of shape (n_snps,), optional): 47 An array containing the alternate allele for each SNP. 48 variants_chrom (array of shape (n_snps,), optional): 49 An array containing the chromosome for each SNP. 50 variants_filter_pass (array of shape (n_snps,), optional): 51 An array indicating whether each SNP passed control checks. 52 variants_id (array of shape (n_snps,), optional): 53 An array containing unique identifiers (IDs) for each SNP. 54 variants_pos (array of shape (n_snps,), optional): 55 An array containing the chromosomal positions for each SNP. 56 variants_qual (array of shape (n_snps,), optional): 57 An array containing the Phred-scaled quality score for each SNP. 58 calldata_lai (array, optional): 59 An array containing the ancestry for each SNP. This array can be either 2D with shape 60 `(n_snps, n_samples*2)`, or 3D with shape (n_snps, n_samples, 2). 61 ancestry_map (dict of str to str, optional): 62 A dictionary mapping ancestry codes to region names. 63 """ 64 self.__calldata_gt = calldata_gt 65 self.__samples = samples 66 self.__variants_ref = variants_ref 67 self.__variants_alt = variants_alt 68 self.__variants_chrom = variants_chrom 69 self.__variants_filter_pass = variants_filter_pass 70 self.__variants_id = variants_id 71 self.__variants_pos = variants_pos 72 self.__variants_qual = variants_qual 73 self.__calldata_lai = calldata_lai 74 self.__ancestry_map = ancestry_map 75 76 self._sanity_check()
Arguments:
- calldata_gt (array, optional): An array containing genotype data for each sample. This array can be either 2D with shape
(n_snps, n_samples)
if the paternal and maternal strands are summed, or 3D with shape(n_snps, n_samples, 2)
if the strands are kept separate. - samples (array of shape (n_sampels,), optional): An array containing unique sample identifiers.
- variants_ref (array of shape (n_snps,), optional): An array containing the reference allele for each SNP.
- variants_alt (array of shape (n_snps,), optional): An array containing the alternate allele for each SNP.
- variants_chrom (array of shape (n_snps,), optional): An array containing the chromosome for each SNP.
- variants_filter_pass (array of shape (n_snps,), optional): An array indicating whether each SNP passed control checks.
- variants_id (array of shape (n_snps,), optional): An array containing unique identifiers (IDs) for each SNP.
- variants_pos (array of shape (n_snps,), optional): An array containing the chromosomal positions for each SNP.
- variants_qual (array of shape (n_snps,), optional): An array containing the Phred-scaled quality score for each SNP.
- calldata_lai (array, optional): An array containing the ancestry for each SNP. This array can be either 2D with shape
(n_snps, n_samples*2)
, or 3D with shape (n_snps, n_samples, 2). - ancestry_map (dict of str to str, optional): A dictionary mapping ancestry codes to region names.
98 @property 99 def calldata_gt(self) -> np.ndarray: 100 """ 101 Retrieve `calldata_gt`. 102 103 Returns: 104 **array:** 105 An array containing genotype data for each sample. This array can be either 2D with shape 106 `(n_snps, n_samples)` if the paternal and maternal strands are summed, or 3D with shape 107 `(n_snps, n_samples, 2)` if the strands are kept separate. 108 """ 109 return self.__calldata_gt
Retrieve calldata_gt
.
Returns:
array: An array containing genotype data for each sample. This array can be either 2D with shape
(n_snps, n_samples)
if the paternal and maternal strands are summed, or 3D with shape(n_snps, n_samples, 2)
if the strands are kept separate.
118 @property 119 def samples(self) -> Optional[np.ndarray]: 120 """ 121 Retrieve `samples`. 122 123 Returns: 124 **array of shape (n_sampels,):** 125 An array containing unique sample identifiers. 126 """ 127 return self.__samples
Retrieve samples
.
Returns:
array of shape (n_sampels,): An array containing unique sample identifiers.
136 @property 137 def variants_ref(self) -> Optional[np.ndarray]: 138 """ 139 Retrieve `variants_ref`. 140 141 Returns: 142 **array of shape (n_snps,):** An array containing the reference allele for each SNP. 143 """ 144 return self.__variants_ref
Retrieve variants_ref
.
Returns:
array of shape (n_snps,): An array containing the reference allele for each SNP.
153 @property 154 def variants_alt(self) -> Optional[np.ndarray]: 155 """ 156 Retrieve `variants_alt`. 157 158 Returns: 159 **array of shape (n_snps,):** An array containing the alternate allele for each SNP. 160 """ 161 return self.__variants_alt
Retrieve variants_alt
.
Returns:
array of shape (n_snps,): An array containing the alternate allele for each SNP.
170 @property 171 def variants_chrom(self) -> Optional[np.ndarray]: 172 """ 173 Retrieve `variants_chrom`. 174 175 Returns: 176 **array of shape (n_snps,):** An array containing the chromosome for each SNP. 177 """ 178 return self.__variants_chrom
Retrieve variants_chrom
.
Returns:
array of shape (n_snps,): An array containing the chromosome for each SNP.
187 @property 188 def variants_filter_pass(self) -> Optional[np.ndarray]: 189 """ 190 Retrieve `variants_filter_pass`. 191 192 Returns: 193 **array of shape (n_snps,):** An array indicating whether each SNP passed control checks. 194 """ 195 return self.__variants_filter_pass
Retrieve variants_filter_pass
.
Returns:
array of shape (n_snps,): An array indicating whether each SNP passed control checks.
204 @property 205 def variants_id(self) -> Optional[np.ndarray]: 206 """ 207 Retrieve `variants_id`. 208 209 Returns: 210 **array of shape (n_snps,):** An array containing unique identifiers (IDs) for each SNP. 211 """ 212 return self.__variants_id
Retrieve variants_id
.
Returns:
array of shape (n_snps,): An array containing unique identifiers (IDs) for each SNP.
221 @property 222 def variants_pos(self) -> Optional[np.ndarray]: 223 """ 224 Retrieve `variants_pos`. 225 226 Returns: 227 **array of shape (n_snps,):** An array containing the chromosomal positions for each SNP. 228 """ 229 return self.__variants_pos
Retrieve variants_pos
.
Returns:
array of shape (n_snps,): An array containing the chromosomal positions for each SNP.
238 @property 239 def variants_qual(self) -> Optional[np.ndarray]: 240 """ 241 Retrieve `variants_qual`. 242 243 Returns: 244 **array of shape (n_snps,):** An array containing the Phred-scaled quality score for each SNP. 245 """ 246 return self.__variants_qual
Retrieve variants_qual
.
Returns:
array of shape (n_snps,): An array containing the Phred-scaled quality score for each SNP.
255 @property 256 def calldata_lai(self) -> Optional[np.ndarray]: 257 """ 258 Retrieve `calldata_lai`. 259 260 Returns: 261 **array:** 262 An array containing the ancestry for each SNP. This array can be either 2D with shape 263 `(n_snps, n_samples*2)`, or 3D with shape (n_snps, n_samples, 2). 264 """ 265 return self.__calldata_lai
Retrieve calldata_lai
.
Returns:
array: An array containing the ancestry for each SNP. This array can be either 2D with shape
(n_snps, n_samples*2)
, or 3D with shape (n_snps, n_samples, 2).
274 @property 275 def ancestry_map(self) -> Optional[Dict[str, str]]: 276 """ 277 Retrieve `ancestry_map`. 278 279 Returns: 280 **dict of str to str:** A dictionary mapping ancestry codes to region names. 281 """ 282 return self.__ancestry_map
Retrieve ancestry_map
.
Returns:
dict of str to str: A dictionary mapping ancestry codes to region names.
291 @property 292 def n_samples(self) -> int: 293 """ 294 Retrieve `n_samples`. 295 296 Returns: 297 **int:** The total number of samples. 298 """ 299 if self.__samples is not None: 300 return len(self.__samples) 301 elif self.__calldata_gt is not None: 302 return self.__calldata_gt.shape[1] 303 elif self.__calldata_lai is not None: 304 if self.__calldata_lai.ndim == 2: 305 return self.__calldata_lai.shape[1] // 2 306 elif self.__calldata_lai.ndim == 3: 307 return self.__calldata_lai.shape[1] 308 else: 309 raise ValueError("Unable to determine the total number of samples: no relevant data is available.")
311 @property 312 def n_snps(self) -> int: 313 """ 314 Retrieve `n_snps`. 315 316 Returns: 317 **int:** The total number of SNPs. 318 """ 319 # List of attributes that can indicate the number of SNPs 320 potential_attributes = [ 321 self.__calldata_gt, 322 self.__variants_ref, 323 self.__variants_alt, 324 self.__variants_chrom, 325 self.__variants_filter_pass, 326 self.__variants_id, 327 self.__variants_pos, 328 self.__variants_qual, 329 self.__calldata_lai 330 ] 331 332 # Check each attribute for its first dimension, which corresponds to `n_snps` 333 for attr in potential_attributes: 334 if attr is not None: 335 return attr.shape[0] 336 337 raise ValueError("Unable to determine the total number of SNPs: no relevant data is available.")
339 @property 340 def n_chrom(self) -> Optional[int]: 341 """ 342 Retrieve `n_chrom`. 343 344 Returns: 345 **int:** The total number of unique chromosomes in `variants_chrom`. 346 """ 347 if self.variants_chrom is None: 348 warnings.warn("Chromosome data `variants_chrom` is None.") 349 return None 350 351 return len(self.unique_chrom)
353 @property 354 def n_ancestries(self) -> int: 355 """ 356 Retrieve `n_ancestries`. 357 358 Returns: 359 **int:** The total number of unique ancestries. 360 """ 361 if self.__calldata_lai is not None: 362 return len(np.unique(self.__calldata_lai)) 363 else: 364 raise ValueError("Unable to determine the total number of ancestries: no relevant data is available.")
366 @property 367 def unique_chrom(self) -> Optional[np.ndarray]: 368 """ 369 Retrieve `unique_chrom`. 370 371 Returns: 372 **array:** The unique chromosome names in `variants_chrom`, preserving their order of appearance. 373 """ 374 if self.variants_chrom is None: 375 warnings.warn("Chromosome data `variants_chrom` is None.") 376 return None 377 378 # Identify unique chromosome names and their first indexes of occurrence 379 _, idx = np.unique(self.variants_chrom, return_index=True) 380 # Return chromosome names sorted by their first occurrence to maintain original order 381 return self.variants_chrom[np.sort(idx)]
Retrieve unique_chrom
.
Returns:
array: The unique chromosome names in
variants_chrom
, preserving their order of appearance.
383 @property 384 def are_strands_summed(self) -> bool: 385 """ 386 Retrieve `are_strands_summed`. 387 388 Returns: 389 **bool:** 390 True if the maternal and paternal strands have been summed together, which is indicated by 391 `calldata_gt` having shape `(n_samples, n_snps)`. False if the strands are stored separately, 392 indicated by `calldata_gt` having shape `(n_samples, n_snps, 2)`. 393 """ 394 if self.calldata_gt is None: 395 warnings.warn("Genotype data `calldata_gt` is None.") 396 return None 397 398 return self.calldata_gt.ndim == 2
Retrieve are_strands_summed
.
Returns:
bool: True if the maternal and paternal strands have been summed together, which is indicated by
calldata_gt
having shape(n_samples, n_snps)
. False if the strands are stored separately, indicated bycalldata_gt
having shape(n_samples, n_snps, 2)
.
400 def copy(self) -> 'SNPObject': 401 """ 402 Create and return a copy of `self`. 403 404 Returns: 405 **SNPObject:** 406 A new instance of the current object. 407 """ 408 return copy.deepcopy(self)
Create and return a copy of self
.
Returns:
SNPObject: A new instance of the current object.
410 def keys(self) -> List[str]: 411 """ 412 Retrieve a list of public attribute names for `self`. 413 414 Returns: 415 **list of str:** 416 A list of attribute names, with internal name-mangling removed, 417 for easier reference to public attributes in the instance. 418 """ 419 return [attr.replace('_SNPObject__', '') for attr in vars(self)]
Retrieve a list of public attribute names for self
.
Returns:
list of str: A list of attribute names, with internal name-mangling removed, for easier reference to public attributes in the instance.
421 def sum_strands(self, inplace: bool = False) -> Optional['SNPObject']: 422 """ 423 Sum paternal and maternal strands. 424 425 Args: 426 inplace (bool, default=False): 427 If True, modifies `self` in place. If False, returns a new `SNPObject` with the variants 428 filtered. Default is False. 429 430 Returns: 431 **Optional[SNPObject]:** 432 A new `SNPObject` with summed strands if `inplace=False`. 433 If `inplace=True`, modifies `self` in place and returns None. 434 """ 435 if self.calldata_gt is None: 436 warnings.warn("Genotype data `calldata_gt` is None.") 437 return None if not inplace else self 438 439 if self.are_strands_summed: 440 warnings.warn("Genotype data `calldata_gt` is already summed.") 441 return self if inplace else self.copy() 442 443 if inplace: 444 self.calldata_gt = self.calldata_gt.sum(axis=2, dtype=np.int8) 445 return self 446 else: 447 snpobj = self.copy() 448 snpobj.calldata_gt = self.calldata_gt.sum(axis=2, dtype=np.int8) 449 return snpobj
Sum paternal and maternal strands.
Arguments:
- inplace (bool, default=False): If True, modifies
self
in place. If False, returns a newSNPObject
with the variants filtered. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObject
with summed strands ifinplace=False
. Ifinplace=True
, modifiesself
in place and returns None.
451 def filter_variants( 452 self, 453 chrom: Optional[Union[str, Sequence[str], np.ndarray, None]] = None, 454 pos: Optional[Union[int, Sequence[int], np.ndarray, None]] = None, 455 indexes: Optional[Union[int, Sequence[int], np.ndarray, None]] = None, 456 include: bool = True, 457 inplace: bool = False 458 ) -> Optional['SNPObject']: 459 """ 460 Filter variants based on specified chromosome names, variant positions, or variant indexes. 461 462 This method updates the `calldata_gt`, `variants_ref`, `variants_alt`, 463 `variants_chrom`, `variants_filter_pass`, `variants_id`, `variants_pos`, 464 `variants_qual`, and `lai` attributes to include or exclude the specified variants. The filtering 465 criteria can be based on chromosome names, variant positions, or indexes. If multiple 466 criteria are provided, their union is used for filtering. The order of the variants is preserved. 467 468 Negative indexes are supported and follow 469 [NumPy's indexing conventions](https://numpy.org/doc/stable/user/basics.indexing.html). 470 471 Args: 472 chrom (str or array_like of str, optional): 473 Chromosome(s) to filter variants by. Can be a single chromosome as a string or a sequence 474 of chromosomes. If both `chrom` and `pos` are provided, they must either have matching lengths 475 (pairing each chromosome with a position) or `chrom` should be a single value that applies to 476 all positions in `pos`. Default is None. 477 pos (int or array_like of int, optional): 478 Position(s) to filter variants by. Can be a single position as an integer or a sequence of positions. 479 If `chrom` is also provided, `pos` should either match `chrom` in length or `chrom` should be a 480 single value. Default is None. 481 indexes (int or array_like of int, optional): 482 Index(es) of the variants to include or exclude. Can be a single index or a sequence 483 of indexes. Negative indexes are supported. Default is None. 484 include (bool, default=True): 485 If True, includes only the specified variants. If False, excludes the specified 486 variants. Default is True. 487 inplace (bool, default=False): 488 If True, modifies `self` in place. If False, returns a new `SNPObject` with the variants 489 filtered. Default is False. 490 491 Returns: 492 **Optional[SNPObject]:** 493 A new `SNPObject` with the specified variants filtered if `inplace=False`. 494 If `inplace=True`, modifies `self` in place and returns None. 495 """ 496 if chrom is None and pos is None and indexes is None: 497 raise ValueError("At least one of 'chrom', 'pos', or 'indexes' must be provided.") 498 499 n_snps = self.n_snps 500 501 # Convert inputs to arrays for consistency 502 chrom = np.atleast_1d(chrom) if chrom is not None else None 503 pos = np.atleast_1d(pos) if pos is not None else None 504 indexes = np.atleast_1d(indexes) if indexes is not None else None 505 506 # Validate chrom and pos lengths if both are provided 507 if chrom is not None and pos is not None: 508 if len(chrom) != len(pos) and len(chrom) > 1: 509 raise ValueError( 510 "When both 'chrom' and 'pos' are provided, they must either be of the same length " 511 "or 'chrom' must be a single value." 512 ) 513 514 # Create a mask for chromosome and position filtering 515 mask_combined = np.zeros(n_snps, dtype=bool) 516 if chrom is not None and pos is not None: 517 if len(chrom) == 1: 518 # Apply single chromosome to all positions in `pos` 519 mask_combined = (self['variants_chrom'] == chrom[0]) & np.isin(self['variants_pos'], pos) 520 else: 521 # Vectorized pair matching for chrom and pos 522 query_pairs = np.array( 523 list(zip(chrom, pos)), 524 dtype=[ 525 ('chrom', self['variants_chrom'].dtype), 526 ('pos', self['variants_pos'].dtype) 527 ] 528 ) 529 data_pairs = np.array( 530 list(zip(self['variants_chrom'], self['variants_pos'])), 531 dtype=[ 532 ('chrom', self['variants_chrom'].dtype), 533 ('pos', self['variants_pos'].dtype) 534 ] 535 ) 536 mask_combined = np.isin(data_pairs, query_pairs) 537 538 elif chrom is not None: 539 # Only chromosome filtering 540 mask_combined = np.isin(self['variants_chrom'], chrom) 541 elif pos is not None: 542 # Only position filtering 543 mask_combined = np.isin(self['variants_pos'], pos) 544 545 # Create mask based on indexes if provided 546 if indexes is not None: 547 # Validate indexes, allowing negative indexes 548 out_of_bounds_indexes = indexes[(indexes < -n_snps) | (indexes >= n_snps)] 549 if out_of_bounds_indexes.size > 0: 550 raise ValueError(f"One or more sample indexes are out of bounds.") 551 552 # Handle negative indexes and check for out-of-bounds indexes 553 adjusted_indexes = np.mod(indexes, n_snps) 554 555 # Create mask for specified indexes 556 mask_indexes = np.zeros(n_snps, dtype=bool) 557 mask_indexes[adjusted_indexes] = True 558 559 # Combine with `chrom` and `pos` mask using logical OR (union of all specified criteria) 560 mask_combined = mask_combined | mask_indexes 561 562 # Invert mask if `include` is False 563 if not include: 564 mask_combined = ~mask_combined 565 566 # Define keys to filter 567 keys = [ 568 'calldata_gt', 'variants_ref', 'variants_alt', 'variants_chrom', 'variants_filter_pass', 569 'variants_id', 'variants_pos', 'variants_qual', 'calldata_lai' 570 ] 571 572 # Apply filtering based on inplace parameter 573 if inplace: 574 for key in keys: 575 if self[key] is not None: 576 if self[key].ndim > 1: 577 self[key] = np.asarray(self[key])[mask_combined, ...] 578 else: 579 self[key] = np.asarray(self[key])[mask_combined] 580 581 return None 582 else: 583 # Create A new `SNPObject` with filtered data 584 snpobj = self.copy() 585 for key in keys: 586 if snpobj[key] is not None: 587 if snpobj[key].ndim > 1: 588 snpobj[key] = np.asarray(snpobj[key])[mask_combined, ...] 589 else: 590 snpobj[key] = np.asarray(snpobj[key])[mask_combined] 591 592 return snpobj
Filter variants based on specified chromosome names, variant positions, or variant indexes.
This method updates the calldata_gt
, variants_ref
, variants_alt
,
variants_chrom
, variants_filter_pass
, variants_id
, variants_pos
,
variants_qual
, and lai
attributes to include or exclude the specified variants. The filtering
criteria can be based on chromosome names, variant positions, or indexes. If multiple
criteria are provided, their union is used for filtering. The order of the variants is preserved.
Negative indexes are supported and follow NumPy's indexing conventions.
Arguments:
- chrom (str or array_like of str, optional): Chromosome(s) to filter variants by. Can be a single chromosome as a string or a sequence
of chromosomes. If both
chrom
andpos
are provided, they must either have matching lengths (pairing each chromosome with a position) orchrom
should be a single value that applies to all positions inpos
. Default is None. - pos (int or array_like of int, optional): Position(s) to filter variants by. Can be a single position as an integer or a sequence of positions.
If
chrom
is also provided,pos
should either matchchrom
in length orchrom
should be a single value. Default is None. - indexes (int or array_like of int, optional): Index(es) of the variants to include or exclude. Can be a single index or a sequence of indexes. Negative indexes are supported. Default is None.
- include (bool, default=True): If True, includes only the specified variants. If False, excludes the specified variants. Default is True.
- inplace (bool, default=False): If True, modifies
self
in place. If False, returns a newSNPObject
with the variants filtered. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObject
with the specified variants filtered ifinplace=False
. Ifinplace=True
, modifiesself
in place and returns None.
594 def filter_samples( 595 self, 596 samples: Optional[Union[str, Sequence[str], np.ndarray, None]] = None, 597 indexes: Optional[Union[int, Sequence[int], np.ndarray, None]] = None, 598 include: bool = True, 599 inplace: bool = False 600 ) -> Optional['SNPObject']: 601 """ 602 Filter samples based on specified names or indexes. 603 604 This method updates the `samples` and `calldata_gt` attributes to include or exclude the specified 605 samples. The order of the samples is preserved. 606 607 If both samples and indexes are provided, any sample matching either a name in samples or an index in 608 indexes will be included or excluded. 609 610 This method allows inclusion or exclusion of specific samples by their names or 611 indexes. When both sample names and indexes are provided, the union of the specified samples 612 is used. Negative indexes are supported and follow 613 [NumPy's indexing conventions](https://numpy.org/doc/stable/user/basics.indexing.html). 614 615 Args: 616 samples (str or array_like of str, optional): 617 Name(s) of the samples to include or exclude. Can be a single sample name or a 618 sequence of sample names. Default is None. 619 indexes (int or array_like of int, optional): 620 Index(es) of the samples to include or exclude. Can be a single index or a sequence 621 of indexes. Negative indexes are supported. Default is None. 622 include (bool, default=True): 623 If True, includes only the specified samples. If False, excludes the specified 624 samples. Default is True. 625 inplace (bool, default=False): 626 If True, modifies `self` in place. If False, returns a new `SNPObject` with the samples 627 filtered. Default is False. 628 629 Returns: 630 **Optional[SNPObject]:** 631 A new `SNPObject` with the specified samples filtered if `inplace=False`. 632 If `inplace=True`, modifies `self` in place and returns None. 633 """ 634 if samples is None and indexes is None: 635 raise ValueError("At least one of 'samples' or 'indexes' must be provided.") 636 637 n_samples = self.n_samples 638 sample_names = np.array(self['samples']) 639 640 # Create mask based on sample names 641 if samples is not None: 642 samples = np.atleast_1d(samples) 643 mask_samples = np.isin(sample_names, samples) 644 missing_samples = samples[~np.isin(samples, sample_names)] 645 if missing_samples.size > 0: 646 raise ValueError(f"The following specified samples were not found: {missing_samples.tolist()}") 647 else: 648 mask_samples = np.zeros(n_samples, dtype=bool) 649 650 # Create mask based on sample indexes 651 if indexes is not None: 652 indexes = np.atleast_1d(indexes) 653 654 # Validate indexes, allowing negative indexes 655 out_of_bounds_indexes = indexes[(indexes < -n_samples) | (indexes >= n_samples)] 656 if out_of_bounds_indexes.size > 0: 657 raise ValueError(f"One or more sample indexes are out of bounds.") 658 659 # Handle negative indexes 660 adjusted_indexes = np.mod(indexes, n_samples) 661 662 mask_indexes = np.zeros(n_samples, dtype=bool) 663 mask_indexes[adjusted_indexes] = True 664 else: 665 mask_indexes = np.zeros(n_samples, dtype=bool) 666 667 # Combine masks using logical OR (union of samples) 668 mask_combined = mask_samples | mask_indexes 669 670 if not include: 671 mask_combined = ~mask_combined 672 673 # Define keys to filter 674 keys = ['samples', 'calldata_gt', 'calldata_lai'] 675 676 # Apply filtering based on inplace parameter 677 if inplace: 678 for key in keys: 679 if self[key] is not None: 680 if self[key].ndim > 1: 681 self[key] = np.asarray(self[key])[:, mask_combined, ...] 682 else: 683 self[key] = np.asarray(self[key])[mask_combined] 684 685 return None 686 else: 687 # Create A new `SNPObject` with filtered data 688 snpobj = self.copy() 689 for key in keys: 690 if snpobj[key] is not None: 691 if snpobj[key].ndim > 1: 692 snpobj[key] = np.asarray(snpobj[key])[:, mask_combined, ...] 693 else: 694 snpobj[key] = np.asarray(snpobj[key])[mask_combined] 695 return snpobj
Filter samples based on specified names or indexes.
This method updates the samples
and calldata_gt
attributes to include or exclude the specified
samples. The order of the samples is preserved.
If both samples and indexes are provided, any sample matching either a name in samples or an index in indexes will be included or excluded.
This method allows inclusion or exclusion of specific samples by their names or indexes. When both sample names and indexes are provided, the union of the specified samples is used. Negative indexes are supported and follow NumPy's indexing conventions.
Arguments:
- samples (str or array_like of str, optional): Name(s) of the samples to include or exclude. Can be a single sample name or a sequence of sample names. Default is None.
- indexes (int or array_like of int, optional): Index(es) of the samples to include or exclude. Can be a single index or a sequence of indexes. Negative indexes are supported. Default is None.
- include (bool, default=True): If True, includes only the specified samples. If False, excludes the specified samples. Default is True.
- inplace (bool, default=False): If True, modifies
self
in place. If False, returns a newSNPObject
with the samples filtered. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObject
with the specified samples filtered ifinplace=False
. Ifinplace=True
, modifiesself
in place and returns None.
697 def detect_chromosome_format(self) -> str: 698 """ 699 Detect the chromosome naming convention in `variants_chrom` based on the prefix 700 of the first chromosome identifier in `unique_chrom`. 701 702 **Recognized formats:** 703 704 - `'chr'`: Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'. 705 - `'chm'`: Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'. 706 - `'chrom'`: Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'. 707 - `'plain'`: Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'. 708 709 If the format does not match any recognized pattern, `'Unknown format'` is returned. 710 711 Returns: 712 **str:** 713 A string indicating the detected chromosome format (`'chr'`, `'chm'`, `'chrom'`, or `'plain'`). 714 If no recognized format is matched, returns `'Unknown format'`. 715 """ 716 # Select the first unique chromosome identifier for format detection 717 chromosome_str = self.unique_chrom[0] 718 719 # Define regular expressions to match each recognized chromosome format 720 patterns = { 721 'chr': r'^chr(\d+|X|Y|M)$', # Matches 'chr' prefixed format 722 'chm': r'^chm(\d+|X|Y|M)$', # Matches 'chm' prefixed format 723 'chrom': r'^chrom(\d+|X|Y|M)$', # Matches 'chrom' prefixed format 724 'plain': r'^(\d+|X|Y|M)$' # Matches plain format without prefix 725 } 726 727 # Iterate through the patterns to identify the chromosome format 728 for prefix, pattern in patterns.items(): 729 if re.match(pattern, chromosome_str): 730 return prefix # Return the recognized format prefix 731 732 # If no pattern matches, return 'Unknown format' 733 return 'Unknown format'
Detect the chromosome naming convention in variants_chrom
based on the prefix
of the first chromosome identifier in unique_chrom
.
Recognized formats:
'chr'
: Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'.'chm'
: Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'.'chrom'
: Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'.'plain'
: Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'.
If the format does not match any recognized pattern, 'Unknown format'
is returned.
Returns:
str: A string indicating the detected chromosome format (
'chr'
,'chm'
,'chrom'
, or'plain'
). If no recognized format is matched, returns'Unknown format'
.
735 def convert_chromosome_format( 736 self, 737 from_format: str, 738 to_format: str, 739 inplace: bool = False 740 ) -> Optional['SNPObject']: 741 """ 742 Convert the chromosome format from one naming convention to another in `variants_chrom`. 743 744 **Supported formats:** 745 746 - `'chr'`: Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'. 747 - `'chm'`: Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'. 748 - `'chrom'`: Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'. 749 - `'plain'`: Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'. 750 751 Args: 752 from_format (str): 753 The current chromosome format. Acceptable values are `'chr'`, `'chm'`, `'chrom'`, or `'plain'`. 754 to_format (str): 755 The target format for chromosome data conversion. Acceptable values match `from_format` options. 756 inplace (bool, default=False): 757 If True, modifies `self` in place. If False, returns a new `SNPObject` with the converted format. 758 Default is False. 759 760 Returns: 761 **Optional[SNPObject]:** A new `SNPObject` with the converted chromosome format if `inplace=False`. 762 If `inplace=True`, modifies `self` in place and returns None. 763 """ 764 # Define the list of standard chromosome identifiers 765 chrom_list = [*map(str, range(1, 23)), 'X', 'Y', 'M'] # M for mitochondrial chromosomes 766 767 # Format mappings for different chromosome naming conventions 768 format_mappings = { 769 'chr': [f'chr{i}' for i in chrom_list], 770 'chm': [f'chm{i}' for i in chrom_list], 771 'chrom': [f'chrom{i}' for i in chrom_list], 772 'plain': chrom_list, 773 } 774 775 # Verify that from_format and to_format are valid naming conventions 776 if from_format not in format_mappings or to_format not in format_mappings: 777 raise ValueError(f"Invalid format: {from_format} or {to_format}. Must be one of {list(format_mappings.keys())}.") 778 779 # Convert chromosomes to string for consistent comparison 780 variants_chrom = self['variants_chrom'].astype(str) 781 782 # Verify that all chromosomes in the object follow the specified `from_format` 783 expected_chroms = set(format_mappings[from_format]) 784 mismatched_chroms = set(variants_chrom) - expected_chroms 785 786 if mismatched_chroms: 787 raise ValueError(f"The following chromosomes do not match the `from_format` '{from_format}': {mismatched_chroms}.") 788 789 # Create conditions for selecting based on current `from_format` names 790 conditions = [variants_chrom == chrom for chrom in format_mappings[from_format]] 791 792 # Rename chromosomes based on inplace flag 793 if inplace: 794 self['variants_chrom'] = np.select(conditions, format_mappings[to_format], default='unknown') 795 return None 796 else: 797 snpobject = self.copy() 798 snpobject['variants_chrom'] = np.select(conditions, format_mappings[to_format], default='unknown') 799 return snpobject
Convert the chromosome format from one naming convention to another in variants_chrom
.
Supported formats:
'chr'
: Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'.'chm'
: Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'.'chrom'
: Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'.'plain'
: Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'.
Arguments:
- from_format (str): The current chromosome format. Acceptable values are
'chr'
,'chm'
,'chrom'
, or'plain'
. - to_format (str): The target format for chromosome data conversion. Acceptable values match
from_format
options. - inplace (bool, default=False): If True, modifies
self
in place. If False, returns a newSNPObject
with the converted format. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObject
with the converted chromosome format ifinplace=False
. Ifinplace=True
, modifiesself
in place and returns None.
801 def match_chromosome_format(self, snpobj: 'SNPObject', inplace: bool = False) -> Optional['SNPObject']: 802 """ 803 Convert the chromosome format in `variants_chrom` from `self` to match the format of a reference `snpobj`. 804 805 **Recognized formats:** 806 807 - `'chr'`: Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'. 808 - `'chm'`: Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'. 809 - `'chrom'`: Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'. 810 - `'plain'`: Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'. 811 812 Args: 813 snpobj (SNPObject): 814 The reference SNPObject whose chromosome format will be matched. 815 inplace (bool, default=False): 816 If True, modifies `self` in place. If False, returns a new `SNPObject` with the 817 chromosome format matching that of `snpobj`. Default is False. 818 819 Returns: 820 **Optional[SNPObject]:** 821 A new `SNPObject` with matched chromosome format if `inplace=False`. 822 If `inplace=True`, modifies `self` in place and returns None. 823 """ 824 # Detect the chromosome naming format of the current SNPObject 825 fmt1 = self.detect_chromosome_format() 826 if fmt1 == 'Unknown format': 827 raise ValueError("The chromosome format of the current SNPObject is unrecognized.") 828 829 # Detect the chromosome naming format of the reference SNPObject 830 fmt2 = snpobj.detect_chromosome_format() 831 if fmt2 == 'Unknown format': 832 raise ValueError("The chromosome format of the reference SNPObject is unrecognized.") 833 834 # Convert the current SNPObject's chromosome format to match the reference format 835 return self.convert_chromosome_format(fmt1, fmt2, inplace=inplace)
Convert the chromosome format in variants_chrom
from self
to match the format of a reference snpobj
.
Recognized formats:
'chr'
: Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'.'chm'
: Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'.'chrom'
: Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'.'plain'
: Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'.
Arguments:
- snpobj (SNPObject): The reference SNPObject whose chromosome format will be matched.
- inplace (bool, default=False): If True, modifies
self
in place. If False, returns a newSNPObject
with the chromosome format matching that ofsnpobj
. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObject
with matched chromosome format ifinplace=False
. Ifinplace=True
, modifiesself
in place and returns None.
837 def rename_chrom( 838 self, 839 to_replace: Union[Dict[str, str], str, List[str]] = {'^([0-9]+)$': r'chr\1', r'^chr([0-9]+)$': r'\1'}, 840 value: Optional[Union[str, List[str]]] = None, 841 regex: bool = True, 842 inplace: bool = False 843 ) -> Optional['SNPObject']: 844 """ 845 Replace chromosome values in `variants_chrom` using patterns or exact matches. 846 847 This method allows flexible chromosome replacements, using regex or exact matches, useful 848 for non-standard chromosome formats. For standard conversions (e.g., 'chr1' to '1'), 849 consider `convert_chromosome_format`. 850 851 Args: 852 to_replace (dict, str, or list of str): 853 Pattern(s) or exact value(s) to be replaced in chromosome names. Default behavior 854 transforms `<chrom_num>` to `chr<chrom_num>` or vice versa. Non-matching values 855 remain unchanged. 856 - If str or list of str: Matches will be replaced with `value`. 857 - If regex (bool), then any regex matches will be replaced with `value`. 858 - If dict: Keys defines values to replace, with corresponding replacements as values. 859 value (str or list of str, optional): 860 Replacement value(s) if `to_replace` is a string or list. Ignored if `to_replace` 861 is a dictionary. 862 regex (bool, default=True): 863 If True, interprets `to_replace` keys as regex patterns. 864 inplace (bool, default=False): 865 If True, modifies `self` in place. If False, returns a new `SNPObject` with the chromosomes 866 renamed. Default is False. 867 868 Returns: 869 **Optional[SNPObject]:** A new `SNPObject` with the renamed chromosome format if `inplace=False`. 870 If `inplace=True`, modifies `self` in place and returns None. 871 """ 872 # Standardize input format: convert `to_replace` and `value` to a dictionary if needed 873 if isinstance(to_replace, (str, int)): 874 to_replace = [to_replace] 875 if isinstance(value, (str, int)): 876 value = [value] 877 if isinstance(to_replace, list) and isinstance(value, list): 878 dictionary = dict(zip(to_replace, value)) 879 elif isinstance(to_replace, dict) and value is None: 880 dictionary = to_replace 881 else: 882 raise ValueError( 883 "Invalid input: `to_replace` and `value` must be compatible types (both str, list of str, or dict)." 884 ) 885 886 # Vectorized function for replacing values in chromosome array 887 vec_replace_values = np.vectorize(self._match_to_replace) 888 889 # Rename chromosomes based on inplace flag 890 if inplace: 891 self.variants_chrom = vec_replace_values(self.variants_chrom, dictionary, regex) 892 return None 893 else: 894 snpobj = self.copy() 895 snpobj.variants_chrom = vec_replace_values(self.variants_chrom, dictionary, regex) 896 return snpobj
Replace chromosome values in variants_chrom
using patterns or exact matches.
This method allows flexible chromosome replacements, using regex or exact matches, useful
for non-standard chromosome formats. For standard conversions (e.g., 'chr1' to '1'),
consider convert_chromosome_format
.
Arguments:
- to_replace (dict, str, or list of str): Pattern(s) or exact value(s) to be replaced in chromosome names. Default behavior
transforms
<chrom_num>
tochr<chrom_num>
or vice versa. Non-matching values remain unchanged.- If str or list of str: Matches will be replaced with
value
. - If regex (bool), then any regex matches will be replaced with
value
. - If dict: Keys defines values to replace, with corresponding replacements as values.
- If str or list of str: Matches will be replaced with
- value (str or list of str, optional): Replacement value(s) if
to_replace
is a string or list. Ignored ifto_replace
is a dictionary. - regex (bool, default=True): If True, interprets
to_replace
keys as regex patterns. - inplace (bool, default=False): If True, modifies
self
in place. If False, returns a newSNPObject
with the chromosomes renamed. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObject
with the renamed chromosome format ifinplace=False
. Ifinplace=True
, modifiesself
in place and returns None.
898 def rename_missings( 899 self, 900 before: Union[int, float, str] = -1, 901 after: Union[int, float, str] = '.', 902 inplace: bool = False 903 ) -> Optional['SNPObject']: 904 """ 905 Replace missing values in the `calldata_gt` attribute. 906 907 This method identifies missing values in 'calldata_gt' and replaces them with a specified 908 value. By default, it replaces occurrences of `-1` (often used to signify missing data) with `'.'`. 909 910 Args: 911 before (int, float, or str, default=-1): 912 The current representation of missing values in `calldata_gt`. Common values might be -1, '.', or NaN. 913 Default is -1. 914 after (int, float, or str, default='.'): 915 The value that will replace `before`. Default is '.'. 916 inplace (bool, default=False): 917 If True, modifies `self` in place. If False, returns a new `SNPObject` with the applied 918 replacements. Default is False. 919 920 Returns: 921 **Optional[SNPObject]:** 922 A new `SNPObject` with the renamed missing values if `inplace=False`. 923 If `inplace=True`, modifies `self` in place and returns None. 924 """ 925 # Rename missing values in the `calldata_gt` attribute based on inplace flag 926 if inplace: 927 self['calldata_gt'] = np.where(self['calldata_gt'] == before, after, self['calldata_gt']) 928 return None 929 else: 930 snpobj = self.copy() 931 snpobj['calldata_gt'] = np.where(snpobj['calldata_gt'] == before, after, snpobj['calldata_gt']) 932 return snpobj
Replace missing values in the calldata_gt
attribute.
This method identifies missing values in 'calldata_gt' and replaces them with a specified
value. By default, it replaces occurrences of -1
(often used to signify missing data) with '.'
.
Arguments:
- before (int, float, or str, default=-1): The current representation of missing values in
calldata_gt
. Common values might be -1, '.', or NaN. Default is -1. - after (int, float, or str, default='.'): The value that will replace
before
. Default is '.'. - inplace (bool, default=False): If True, modifies
self
in place. If False, returns a newSNPObject
with the applied replacements. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObject
with the renamed missing values ifinplace=False
. Ifinplace=True
, modifiesself
in place and returns None.
934 def get_common_variants_intersection( 935 self, 936 snpobj: 'SNPObject', 937 index_by: str = 'pos' 938 ) -> Tuple[List[str], np.ndarray, np.ndarray]: 939 """ 940 Identify common variants between `self` and the `snpobj` instance based on the specified `index_by` criterion, 941 which may match based on chromosome and position (`variants_chrom`, `variants_pos`), ID (`variants_id`), or both. 942 943 This method returns the identifiers of common variants and their corresponding indices in both objects. 944 945 Args: 946 snpobj (SNPObject): 947 The reference SNPObject to compare against. 948 index_by (str, default='pos'): 949 Criteria for matching variants. Options: 950 - `'pos'`: Matches by chromosome and position (`variants_chrom`, `variants_pos`), e.g., 'chr1-12345'. 951 - `'id'`: Matches by variant ID alone (`variants_id`), e.g., 'rs123'. 952 - `'pos+id'`: Matches by chromosome, position, and ID (`variants_chrom`, `variants_pos`, `variants_id`), e.g., 'chr1-12345-rs123'. 953 Default is 'pos'. 954 955 Returns: 956 Tuple containing: 957 - **list of str:** A list of common variant identifiers (as strings). 958 - **array:** An array of indices in `self` where common variants are located. 959 - **array:** An array of indices in `snpobj` where common variants are located. 960 """ 961 # Create unique identifiers for each variant in both SNPObjects based on the specified criterion 962 if index_by == 'pos': 963 query_identifiers = [f"{chrom}-{pos}" for chrom, pos in zip(self['variants_chrom'], self['variants_pos'])] 964 reference_identifiers = [f"{chrom}-{pos}" for chrom, pos in zip(snpobj['variants_chrom'], snpobj['variants_pos'])] 965 elif index_by == 'id': 966 query_identifiers = self['variants_id'].tolist() 967 reference_identifiers = snpobj['variants_id'].tolist() 968 elif index_by == 'pos+id': 969 query_identifiers = [ 970 f"{chrom}-{pos}-{ids}" for chrom, pos, ids in zip(self['variants_chrom'], self['variants_pos'], self['variants_id']) 971 ] 972 reference_identifiers = [ 973 f"{chrom}-{pos}-{ids}" for chrom, pos, ids in zip(snpobj['variants_chrom'], snpobj['variants_pos'], snpobj['variants_id']) 974 ] 975 else: 976 raise ValueError("`index_by` must be one of 'pos', 'id', or 'pos+id'.") 977 978 # Convert to sets for intersection 979 common_ids = set(query_identifiers).intersection(reference_identifiers) 980 981 # Collect indices for common identifiers 982 query_idx = [i for i, id in enumerate(query_identifiers) if id in common_ids] 983 reference_idx = [i for i, id in enumerate(reference_identifiers) if id in common_ids] 984 985 return list(common_ids), np.array(query_idx), np.array(reference_idx)
Identify common variants between self
and the snpobj
instance based on the specified index_by
criterion,
which may match based on chromosome and position (variants_chrom
, variants_pos
), ID (variants_id
), or both.
This method returns the identifiers of common variants and their corresponding indices in both objects.
Arguments:
- snpobj (SNPObject): The reference SNPObject to compare against.
- index_by (str, default='pos'): Criteria for matching variants. Options:
'pos'
: Matches by chromosome and position (variants_chrom
,variants_pos
), e.g., 'chr1-12345'.'id'
: Matches by variant ID alone (variants_id
), e.g., 'rs123'.'pos+id'
: Matches by chromosome, position, and ID (variants_chrom
,variants_pos
,variants_id
), e.g., 'chr1-12345-rs123'. Default is 'pos'.
Returns:
Tuple containing:
- list of str: A list of common variant identifiers (as strings).
- array: An array of indices in
self
where common variants are located.- array: An array of indices in
snpobj
where common variants are located.
987 def get_common_markers_intersection( 988 self, 989 snpobj: 'SNPObject' 990 ) -> Tuple[List[str], np.ndarray, np.ndarray]: 991 """ 992 Identify common markers between between `self` and the `snpobj` instance. Common markers are identified 993 based on matching chromosome (`variants_chrom`), position (`variants_pos`), reference (`variants_ref`), 994 and alternate (`variants_alt`) alleles. 995 996 This method returns the identifiers of common markers and their corresponding indices in both objects. 997 998 Args: 999 snpobj (SNPObject): 1000 The reference SNPObject to compare against. 1001 1002 Returns: 1003 Tuple containing: 1004 - **list of str:** A list of common variant identifiers (as strings). 1005 - **array:** An array of indices in `self` where common variants are located. 1006 - **array:** An array of indices in `snpobj` where common variants are located. 1007 """ 1008 # Generate unique identifiers based on chrom, pos, ref, and alt alleles 1009 query_identifiers = [ 1010 f"{chrom}-{pos}-{ref}-{alt}" for chrom, pos, ref, alt in 1011 zip(self['variants_chrom'], self['variants_pos'], self['variants_ref'], self['variants_alt']) 1012 ] 1013 reference_identifiers = [ 1014 f"{chrom}-{pos}-{ref}-{alt}" for chrom, pos, ref, alt in 1015 zip(snpobj['variants_chrom'], snpobj['variants_pos'], snpobj['variants_ref'], snpobj['variants_alt']) 1016 ] 1017 1018 # Convert to sets for intersection 1019 common_ids = set(query_identifiers).intersection(reference_identifiers) 1020 1021 # Collect indices for common identifiers in both SNPObjects 1022 query_idx = [i for i, id in enumerate(query_identifiers) if id in common_ids] 1023 reference_idx = [i for i, id in enumerate(reference_identifiers) if id in common_ids] 1024 1025 return list(common_ids), np.array(query_idx), np.array(reference_idx)
Identify common markers between between self
and the snpobj
instance. Common markers are identified
based on matching chromosome (variants_chrom
), position (variants_pos
), reference (variants_ref
),
and alternate (variants_alt
) alleles.
This method returns the identifiers of common markers and their corresponding indices in both objects.
Arguments:
- snpobj (SNPObject): The reference SNPObject to compare against.
Returns:
Tuple containing:
- list of str: A list of common variant identifiers (as strings).
- array: An array of indices in
self
where common variants are located.- array: An array of indices in
snpobj
where common variants are located.
1027 def subset_to_common_variants( 1028 self, 1029 snpobj: 'SNPObject', 1030 index_by: str = 'pos', 1031 common_variants_intersection: Optional[Tuple[np.ndarray, np.ndarray]] = None, 1032 inplace: bool = False 1033 ) -> Optional['SNPObject']: 1034 """ 1035 Subset `self` to include only the common variants with a reference `snpobj` based on 1036 the specified `index_by` criterion, which may match based on chromosome and position 1037 (`variants_chrom`, `variants_pos`), ID (`variants_id`), or both. 1038 1039 Args: 1040 snpobj (SNPObject): 1041 The reference SNPObject to compare against. 1042 index_by (str, default='pos'): 1043 Criteria for matching variants. Options: 1044 - `'pos'`: Matches by chromosome and position (`variants_chrom`, `variants_pos`), e.g., 'chr1-12345'. 1045 - `'id'`: Matches by variant ID alone (`variants_id`), e.g., 'rs123'. 1046 - `'pos+id'`: Matches by chromosome, position, and ID (`variants_chrom`, `variants_pos`, `variants_id`), e.g., 'chr1-12345-rs123'. 1047 Default is 'pos'. 1048 common_variants_intersection (Tuple[np.ndarray, np.ndarray], optional): 1049 Precomputed indices of common variants between `self` and `snpobj`. If None, intersection is 1050 computed within the function. 1051 inplace (bool, default=False): 1052 If True, modifies `self` in place. If False, returns a new `SNPObject` with the common variants 1053 subsetted. Default is False. 1054 1055 Returns: 1056 **Optional[SNPObject]:** 1057 A new `SNPObject` with the common variants subsetted if `inplace=False`. 1058 If `inplace=True`, modifies `self` in place and returns None. 1059 """ 1060 # Get indices of common variants if not provided 1061 if common_variants_intersection is None: 1062 _, query_idx, _ = self.get_common_variants_intersection(snpobj, index_by=index_by) 1063 else: 1064 query_idx, _ = common_variants_intersection 1065 1066 # Use filter_variants method with the identified indices, applying `inplace` as specified 1067 return self.filter_variants(indexes=query_idx, include=True, inplace=inplace)
Subset self
to include only the common variants with a reference snpobj
based on
the specified index_by
criterion, which may match based on chromosome and position
(variants_chrom
, variants_pos
), ID (variants_id
), or both.
Arguments:
- snpobj (SNPObject): The reference SNPObject to compare against.
- index_by (str, default='pos'): Criteria for matching variants. Options:
'pos'
: Matches by chromosome and position (variants_chrom
,variants_pos
), e.g., 'chr1-12345'.'id'
: Matches by variant ID alone (variants_id
), e.g., 'rs123'.'pos+id'
: Matches by chromosome, position, and ID (variants_chrom
,variants_pos
,variants_id
), e.g., 'chr1-12345-rs123'. Default is 'pos'.
- common_variants_intersection (Tuple[np.ndarray, np.ndarray], optional): Precomputed indices of common variants between
self
andsnpobj
. If None, intersection is computed within the function. - inplace (bool, default=False): If True, modifies
self
in place. If False, returns a newSNPObject
with the common variants subsetted. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObject
with the common variants subsetted ifinplace=False
. Ifinplace=True
, modifiesself
in place and returns None.
1069 def subset_to_common_markers( 1070 self, 1071 snpobj: 'SNPObject', 1072 common_markers_intersection: Optional[Tuple[np.ndarray, np.ndarray]] = None, 1073 inplace: bool = False 1074 ) -> Optional['SNPObject']: 1075 """ 1076 Subset `self` to include only the common markers with a reference `snpobj`. Common markers are identified 1077 based on matching chromosome (`variants_chrom`), position (`variants_pos`), reference (`variants_ref`), 1078 and alternate (`variants_alt`) alleles. 1079 1080 Args: 1081 snpobj (SNPObject): 1082 The reference SNPObject to compare against. 1083 common_markers_intersection (tuple of arrays, optional): 1084 Precomputed indices of common markers between `self` and `snpobj`. If None, intersection is 1085 computed within the function. 1086 inplace (bool, default=False): 1087 If True, modifies `self` in place. If False, returns a new `SNPObject` with the common markers 1088 subsetted. Default is False. 1089 1090 Returns: 1091 **Optional[SNPObject]:** 1092 A new `SNPObject` with the common markers subsetted if `inplace=False`. 1093 If `inplace=True`, modifies `self` in place and returns None. 1094 """ 1095 # Get indices of common markers if not provided 1096 if common_markers_intersection is None: 1097 _, query_idx, _ = self.get_common_markers_intersection(snpobj) 1098 else: 1099 query_idx, _ = common_markers_intersection 1100 1101 # Use filter_variants method with the identified indices, applying `inplace` as specified 1102 return self.filter_variants(indexes=query_idx, include=True, inplace=inplace)
Subset self
to include only the common markers with a reference snpobj
. Common markers are identified
based on matching chromosome (variants_chrom
), position (variants_pos
), reference (variants_ref
),
and alternate (variants_alt
) alleles.
Arguments:
- snpobj (SNPObject): The reference SNPObject to compare against.
- common_markers_intersection (tuple of arrays, optional): Precomputed indices of common markers between
self
andsnpobj
. If None, intersection is computed within the function. - inplace (bool, default=False): If True, modifies
self
in place. If False, returns a newSNPObject
with the common markers subsetted. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObject
with the common markers subsetted ifinplace=False
. Ifinplace=True
, modifiesself
in place and returns None.
1104 def merge( 1105 self, 1106 snpobj: 'SNPObject', 1107 force_samples: bool = False, 1108 prefix: str = '2', 1109 inplace: bool = False 1110 ) -> Optional['SNPObject']: 1111 """ 1112 Merge `self` with `snpobj` along the sample axis. 1113 1114 This method expects both SNPObjects to contain the same set of SNPs in the same order, 1115 then combines their genotype (`calldata_gt`) and LAI (`calldata_lai`) arrays by 1116 concatenating the sample dimension. Samples from `snpobj` are appended to those in `self`. 1117 1118 Args: 1119 snpobj (SNPObject): 1120 The SNPObject to merge samples with. 1121 force_samples (bool, default=False): 1122 If True, duplicate sample names are resolved by prepending the `prefix` to duplicate sample names in 1123 `snpobj`. Otherwise, merging fails when duplicate sample names are found. Default is False. 1124 prefix (str, default='2'): 1125 A string prepended to duplicate sample names in `snpobj` when `force_samples=True`. 1126 Duplicates are renamed from `<sample_name>` to `<prefix>:<sample_name>`. For instance, 1127 if `prefix='2'` and there is a conflict with a sample called "sample_1", it becomes "2:sample_1". 1128 inplace (bool, default=False): 1129 If True, modifies `self` in place. If False, returns a new `SNPObject` with the merged samples. 1130 Default is False. 1131 1132 Returns: 1133 **Optional[SNPObject]**: A new SNPObject containing the merged sample data. 1134 """ 1135 # Merge calldata_gt if present and compatible 1136 if self.calldata_gt is not None and snpobj.calldata_gt is not None: 1137 if self.calldata_gt.shape[0] != snpobj.calldata_gt.shape[0]: 1138 raise ValueError( 1139 f"Cannot merge SNPObjects: Mismatch in the number of SNPs in `calldata_gt`.\n" 1140 f"`self.calldata_gt` has {self.calldata_gt.shape[0]} SNPs, " 1141 f"while `snpobj.calldata_gt` has {snpobj.calldata_gt.shape[0]} SNPs." 1142 ) 1143 if self.are_strands_summed and not snpobj.are_strands_summed: 1144 raise ValueError( 1145 "Cannot merge SNPObjects: `self` has summed strands, but `snpobj` does not.\n" 1146 "Ensure both objects have the same genotype summation state before merging." 1147 ) 1148 if not self.are_strands_summed and snpobj.are_strands_summed: 1149 raise ValueError( 1150 "Cannot merge SNPObjects: `snpobj` has summed strands, but `self` does not.\n" 1151 "Ensure both objects have the same genotype summation state before merging." 1152 ) 1153 calldata_gt = np.concatenate([self.calldata_gt, snpobj.calldata_gt], axis=1) 1154 else: 1155 calldata_gt = None 1156 1157 # Merge samples if present and compatible, handling duplicates if `force_samples=True` 1158 if self.samples is not None and snpobj.samples is not None: 1159 overlapping_samples = set(self.samples).intersection(set(snpobj.samples)) 1160 if overlapping_samples: 1161 if not force_samples: 1162 raise ValueError( 1163 f"Cannot merge SNPObjects: Found overlapping sample names {overlapping_samples}.\n" 1164 "Samples must be strictly non-overlapping. To allow merging with renaming, set `force_samples=True`." 1165 ) 1166 else: 1167 # Rename duplicate samples by prepending the file index 1168 renamed_samples = [f"{prefix}:{sample}" if sample in overlapping_samples else sample for sample in snpobj.samples] 1169 samples = np.concatenate([self.samples, renamed_samples], axis=0) 1170 else: 1171 samples = np.concatenate([self.samples, snpobj.samples], axis=0) 1172 else: 1173 samples = None 1174 1175 # Merge LAI data if present and compatible 1176 if self.calldata_lai is not None and snpobj.calldata_lai is not None: 1177 if self.calldata_lai.ndim != snpobj.calldata_lai.ndim: 1178 raise ValueError( 1179 f"Cannot merge SNPObjects: Mismatch in `calldata_lai` dimensions.\n" 1180 f"`self.calldata_lai` has {self.calldata_lai.ndim} dimensions, " 1181 f"while `snpobj.calldata_lai` has {snpobj.calldata_lai.ndim} dimensions." 1182 ) 1183 if self.calldata_lai.shape[0] != snpobj.calldata_lai.shape[0]: 1184 raise ValueError( 1185 f"Cannot merge SNPObjects: Mismatch in the number of SNPs in `calldata_lai`.\n" 1186 f"`self.calldata_lai` has {self.calldata_lai.shape[0]} SNPs, " 1187 f"while `snpobj.calldata_lai` has {snpobj.calldata_lai.shape[0]} SNPs." 1188 ) 1189 calldata_lai = np.concatenate([self.calldata_lai, snpobj.calldata_lai], axis=1) 1190 else: 1191 calldata_lai = None 1192 1193 if inplace: 1194 self.calldata_gt = calldata_gt 1195 self.calldata_lai = calldata_lai 1196 self.samples = samples 1197 return self 1198 1199 # Create and return a new SNPObject containing the merged samples 1200 return SNPObject( 1201 calldata_gt=calldata_gt, 1202 samples=samples, 1203 variants_ref=self.variants_ref, 1204 variants_alt=self.variants_alt, 1205 variants_chrom=self.variants_chrom, 1206 variants_filter_pass=self.variants_filter_pass, 1207 variants_id=self.variants_id, 1208 variants_pos=self.variants_pos, 1209 variants_qual=self.variants_qual, 1210 calldata_lai=calldata_lai, 1211 ancestry_map=self.ancestry_map 1212 )
Merge self
with snpobj
along the sample axis.
This method expects both SNPObjects to contain the same set of SNPs in the same order,
then combines their genotype (calldata_gt
) and LAI (calldata_lai
) arrays by
concatenating the sample dimension. Samples from snpobj
are appended to those in self
.
Arguments:
- snpobj (SNPObject): The SNPObject to merge samples with.
- force_samples (bool, default=False): If True, duplicate sample names are resolved by prepending the
prefix
to duplicate sample names insnpobj
. Otherwise, merging fails when duplicate sample names are found. Default is False. - prefix (str, default='2'): A string prepended to duplicate sample names in
snpobj
whenforce_samples=True
. Duplicates are renamed from<sample_name>
to<prefix>:<sample_name>
. For instance, ifprefix='2'
and there is a conflict with a sample called "sample_1", it becomes "2:sample_1". - inplace (bool, default=False): If True, modifies
self
in place. If False, returns a newSNPObject
with the merged samples. Default is False.
Returns:
Optional[SNPObject]: A new SNPObject containing the merged sample data.
1214 def concat( 1215 self, 1216 snpobj: 'SNPObject', 1217 inplace: bool = False 1218 ) -> Optional['SNPObject']: 1219 """ 1220 Concatenate self with snpobj along the SNP axis. 1221 1222 This method expects both SNPObjects to contain the same set of samples in the same order, 1223 and that the chromosome(s) in snpobj follow (i.e. have higher numeric identifiers than) 1224 those in self. 1225 1226 Args: 1227 snpobj (SNPObject): 1228 The SNPObject to concatenate SNPs with. 1229 inplace (bool, default=False): 1230 If True, modifies `self` in place. If False, returns a new `SNPObject` with the concatenated SNPs. 1231 Default is False. 1232 1233 Returns: 1234 **Optional[SNPObject]**: A new SNPObject containing the concatenated SNP data. 1235 """ 1236 # Merge calldata_gt if present and compatible 1237 if self.calldata_gt is not None and snpobj.calldata_gt is not None: 1238 if self.calldata_gt.shape[1] != snpobj.calldata_gt.shape[1]: 1239 raise ValueError( 1240 f"Cannot merge SNPObjects: Mismatch in the number of samples in `calldata_gt`.\n" 1241 f"`self.calldata_gt` has {self.calldata_gt.shape[1]} samples, " 1242 f"while `snpobj.calldata_gt` has {snpobj.calldata_gt.shape[1]} samples." 1243 ) 1244 if self.are_strands_summed and not snpobj.are_strands_summed: 1245 raise ValueError( 1246 "Cannot merge SNPObjects: `self` has summed strands, but `snpobj` does not.\n" 1247 "Ensure both objects have the same genotype summation state before merging." 1248 ) 1249 if not self.are_strands_summed and snpobj.are_strands_summed: 1250 raise ValueError( 1251 "Cannot merge SNPObjects: `snpobj` has summed strands, but `self` does not.\n" 1252 "Ensure both objects have the same genotype summation state before merging." 1253 ) 1254 calldata_gt = np.concatenate([self.calldata_gt, snpobj.calldata_gt], axis=0) 1255 else: 1256 calldata_gt = None 1257 1258 # Merge SNP-related attributes if present 1259 attributes = [ 1260 'variants_ref', 'variants_alt', 'variants_chrom', 'variants_filter_pass', 'variants_id', 'variants_pos', 'variants_qual' 1261 ] 1262 merged_attrs = {} 1263 for attr in attributes: 1264 self_attr = getattr(self, attr, None) 1265 obj_attr = getattr(snpobj, attr, None) 1266 1267 # Concatenate if both present 1268 if self_attr is not None and obj_attr is not None: 1269 merged_attrs[attr] = np.concatenate([self_attr, obj_attr], axis=0) 1270 else: 1271 # If either is None, store None 1272 merged_attrs[attr] = None 1273 1274 # Merge LAI data if present and compatible 1275 if self.calldata_lai is not None and snpobj.calldata_lai is not None: 1276 if self.calldata_lai.ndim != snpobj.calldata_lai.ndim: 1277 raise ValueError( 1278 f"Cannot merge SNPObjects: Mismatch in `calldata_lai` dimensions.\n" 1279 f"`self.calldata_lai` has {self.calldata_lai.ndim} dimensions, " 1280 f"while `snpobj.calldata_lai` has {snpobj.calldata_lai.ndim} dimensions." 1281 ) 1282 if self.calldata_lai.shape[1] != snpobj.calldata_lai.shape[1]: 1283 raise ValueError( 1284 f"Cannot merge SNPObjects: Mismatch in the number of samples in `calldata_lai`.\n" 1285 f"`self.calldata_lai` has {self.calldata_lai.shape[1]} samples, " 1286 f"while `snpobj.calldata_lai` has {snpobj.calldata_lai.shape[1]} samples." 1287 ) 1288 calldata_lai = np.concatenate([self.calldata_lai, snpobj.calldata_lai], axis=0) 1289 else: 1290 calldata_lai = None 1291 1292 if inplace: 1293 self.calldata_gt = calldata_gt 1294 self.calldata_lai = calldata_lai 1295 for attr in attributes: 1296 self[attr] = merged_attrs[attr] 1297 return self 1298 1299 # Create and return a new SNPObject containing the concatenated SNPs 1300 return SNPObject( 1301 calldata_gt=calldata_gt, 1302 calldata_lai=calldata_lai, 1303 samples=self.samples, 1304 variants_ref=merged_attrs['variants_ref'], 1305 variants_alt=merged_attrs['variants_alt'], 1306 variants_chrom=merged_attrs['variants_chrom'], 1307 variants_id=merged_attrs['variants_id'], 1308 variants_pos=merged_attrs['variants_pos'], 1309 variants_qual=merged_attrs['variants_qual'], 1310 variants_filter_pass=merged_attrs['variants_filter_pass'], 1311 ancestry_map=self.ancestry_map 1312 )
Concatenate self with snpobj along the SNP axis.
This method expects both SNPObjects to contain the same set of samples in the same order, and that the chromosome(s) in snpobj follow (i.e. have higher numeric identifiers than) those in self.
Arguments:
- snpobj (SNPObject): The SNPObject to concatenate SNPs with.
- inplace (bool, default=False): If True, modifies
self
in place. If False, returns a newSNPObject
with the concatenated SNPs. Default is False.
Returns:
Optional[SNPObject]: A new SNPObject containing the concatenated SNP data.
1314 def remove_strand_ambiguous_variants(self, inplace: bool = False) -> Optional['SNPObject']: 1315 """ 1316 A strand-ambiguous variant has reference (`variants_ref`) and alternate (`variants_alt`) alleles 1317 in the pairs A/T, T/A, C/G, or G/C, where both alleles are complementary and thus indistinguishable 1318 in terms of strand orientation. 1319 1320 Args: 1321 inplace (bool, default=False): 1322 If True, modifies `self` in place. If False, returns a new `SNPObject` with the 1323 strand-ambiguous variants removed. Default is False. 1324 1325 Returns: 1326 **Optional[SNPObject]:** A new `SNPObject` with non-ambiguous variants only if `inplace=False`. 1327 If `inplace=True`, modifies `self` in place and returns None. 1328 """ 1329 # Identify strand-ambiguous SNPs using vectorized comparisons 1330 is_AT = (self['variants_ref'] == 'A') & (self['variants_alt'] == 'T') 1331 is_TA = (self['variants_ref'] == 'T') & (self['variants_alt'] == 'A') 1332 is_CG = (self['variants_ref'] == 'C') & (self['variants_alt'] == 'G') 1333 is_GC = (self['variants_ref'] == 'G') & (self['variants_alt'] == 'C') 1334 1335 # Create a combined mask for all ambiguous variants 1336 ambiguous_mask = is_AT | is_TA | is_CG | is_GC 1337 non_ambiguous_idx = np.where(~ambiguous_mask)[0] 1338 1339 # Count each type of ambiguity using numpy's sum on boolean arrays 1340 A_T_count = np.sum(is_AT) 1341 T_A_count = np.sum(is_TA) 1342 C_G_count = np.sum(is_CG) 1343 G_C_count = np.sum(is_GC) 1344 1345 # Log the counts of each type of strand-ambiguous variants 1346 total_ambiguous = A_T_count + T_A_count + C_G_count + G_C_count 1347 log.info(f'{A_T_count} ambiguities of A-T type.') 1348 log.info(f'{T_A_count} ambiguities of T-A type.') 1349 log.info(f'{C_G_count} ambiguities of C-G type.') 1350 log.info(f'{G_C_count} ambiguities of G-C type.') 1351 1352 # Filter out ambiguous variants and keep non-ambiguous ones 1353 log.debug(f'Removing {total_ambiguous} strand-ambiguous variants...') 1354 return self.filter_variants(indexes=non_ambiguous_idx, include=True, inplace=inplace)
A strand-ambiguous variant has reference (variants_ref
) and alternate (variants_alt
) alleles
in the pairs A/T, T/A, C/G, or G/C, where both alleles are complementary and thus indistinguishable
in terms of strand orientation.
Arguments:
- inplace (bool, default=False): If True, modifies
self
in place. If False, returns a newSNPObject
with the strand-ambiguous variants removed. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObject
with non-ambiguous variants only ifinplace=False
. Ifinplace=True
, modifiesself
in place and returns None.
1356 def correct_flipped_variants( 1357 self, 1358 snpobj: 'SNPObject', 1359 check_complement: bool = True, 1360 index_by: str = 'pos', 1361 common_variants_intersection: Optional[Tuple[np.ndarray, np.ndarray]] = None, 1362 log_stats: bool = True, 1363 inplace: bool = False 1364 ) -> Optional['SNPObject']: 1365 """ 1366 Correct flipped variants between between `self` and a reference `snpobj`, where reference (`variants_ref`) 1367 and alternate (`variants_alt`) alleles are swapped. 1368 1369 **Flip Detection Based on `check_complement`:** 1370 1371 - If `check_complement=False`, only direct allele swaps are considered: 1372 1. **Direct Swap:** `self.variants_ref == snpobj.variants_alt` and `self.variants_alt == snpobj.variants_ref`. 1373 1374 - If `check_complement=True`, both direct and complementary swaps are considered, with four possible cases: 1375 1. **Direct Swap:** `self.variants_ref == snpobj.variants_alt` and `self.variants_alt == snpobj.variants_ref`. 1376 2. **Complement Swap of Ref:** `complement(self.variants_ref) == snpobj.variants_alt` and `self.variants_alt == snpobj.variants_ref`. 1377 3. **Complement Swap of Alt:** `self.variants_ref == snpobj.variants_alt` and `complement(self.variants_alt) == snpobj.variants_ref`. 1378 4. **Complement Swap of both Ref and Alt:** `complement(self.variants_ref) == snpobj.variants_alt` and `complement(self.variants_alt) == snpobj.variants_ref`. 1379 1380 **Note:** Variants where `self.variants_ref == self.variants_alt` are ignored as they are ambiguous. 1381 1382 **Correction Process:** 1383 - Swaps `variants_ref` and `variants_alt` alleles in `self` to align with `snpobj`. 1384 - Flips `calldata_gt` values (0 becomes 1, and 1 becomes 0) to match the updated allele configuration. 1385 1386 Args: 1387 snpobj (SNPObject): 1388 The reference SNPObject to compare against. 1389 check_complement (bool, default=True): 1390 If True, also checks for complementary base pairs (A/T, T/A, C/G, and G/C) when identifying swapped variants. 1391 Default is True. 1392 index_by (str, default='pos'): 1393 Criteria for matching variants. Options: 1394 - `'pos'`: Matches by chromosome and position (`variants_chrom`, `variants_pos`), e.g., 'chr1-12345'. 1395 - `'id'`: Matches by variant ID alone (`variants_id`), e.g., 'rs123'. 1396 - `'pos+id'`: Matches by chromosome, position, and ID (`variants_chrom`, `variants_pos`, `variants_id`), e.g., 'chr1-12345-rs123'. 1397 Default is 'pos'. 1398 common_variants_intersection (tuple of arrays, optional): 1399 Precomputed indices of common variants between `self` and `snpobj`. If None, intersection is 1400 computed within the function. 1401 log_stats (bool, default=True): 1402 If True, logs statistical information about matching and ambiguous alleles. Default is True. 1403 inplace (bool, default=False): 1404 If True, modifies `self` in place. If False, returns a new `SNPObject` with corrected 1405 flips. Default is False. 1406 1407 Returns: 1408 **Optional[SNPObject]**: 1409 A new `SNPObject` with corrected flips if `inplace=False`. 1410 If `inplace=True`, modifies `self` in place and returns None. 1411 """ 1412 # Define complement mappings for nucleotides 1413 complement_map = {'A': 'T', 'T': 'A', 'C': 'G', 'G': 'C'} 1414 1415 # Helper function to get the complement of a base 1416 def get_complement(base: str) -> str: 1417 return complement_map.get(base, base) 1418 1419 # Get common variant indices if not provided 1420 if common_variants_intersection != None: 1421 query_idx, reference_idx = common_variants_intersection 1422 else: 1423 _, query_idx, reference_idx = self.get_common_variants_intersection(snpobj, index_by=index_by) 1424 1425 # Log statistics on matching alleles if enabled 1426 if log_stats: 1427 matching_ref = np.sum(self['variants_ref'][query_idx] == snpobj['variants_ref'][reference_idx]) 1428 matching_alt = np.sum(self['variants_alt'][query_idx] == snpobj['variants_alt'][reference_idx]) 1429 ambiguous = np.sum(self['variants_ref'][query_idx] == self['variants_alt'][query_idx]) 1430 log.info(f"Matching reference alleles (ref=ref'): {matching_ref}, Matching alternate alleles (alt=alt'): {matching_alt}.") 1431 log.info(f"Number of ambiguous alleles (ref=alt): {ambiguous}.") 1432 1433 # Identify indices where `ref` and `alt` alleles are swapped 1434 if not check_complement: 1435 # Simple exact match for swapped alleles 1436 swapped_ref = (self['variants_ref'][query_idx] == snpobj['variants_alt'][reference_idx]) 1437 swapped_alt = (self['variants_alt'][query_idx] == snpobj['variants_ref'][reference_idx]) 1438 else: 1439 # Check for swapped or complementary-swapped alleles 1440 swapped_ref = ( 1441 (self['variants_ref'][query_idx] == snpobj['variants_alt'][reference_idx]) | 1442 (np.vectorize(get_complement)(self['variants_ref'][query_idx]) == snpobj['variants_alt'][reference_idx]) 1443 ) 1444 swapped_alt = ( 1445 (self['variants_alt'][query_idx] == snpobj['variants_ref'][reference_idx]) | 1446 (np.vectorize(get_complement)(self['variants_alt'][query_idx]) == snpobj['variants_ref'][reference_idx]) 1447 ) 1448 1449 # Filter out ambiguous variants where `ref` and `alt` alleles match (ref=alt) 1450 not_ambiguous = (self['variants_ref'][query_idx] != self['variants_alt'][query_idx]) 1451 1452 # Indices in `self` of flipped variants 1453 flip_idx_query = query_idx[swapped_ref & swapped_alt & not_ambiguous] 1454 1455 # Correct the identified variant flips 1456 if len(flip_idx_query) > 0: 1457 log.info(f'Correcting {len(flip_idx_query)} variant flips...') 1458 1459 temp_alts = self['variants_alt'][flip_idx_query] 1460 temp_refs = self['variants_ref'][flip_idx_query] 1461 1462 # Correct the variant flips based on whether the operation is in-place or not 1463 if inplace: 1464 self['variants_alt'][flip_idx_query] = temp_refs 1465 self['variants_ref'][flip_idx_query] = temp_alts 1466 self['calldata_gt'][flip_idx_query] = 1 - self['calldata_gt'][flip_idx_query] 1467 return None 1468 else: 1469 snpobj = self.copy() 1470 snpobj['variants_alt'][flip_idx_query] = temp_refs 1471 snpobj['variants_ref'][flip_idx_query] = temp_alts 1472 snpobj['calldata_gt'][flip_idx_query] = 1 - snpobj['calldata_gt'][flip_idx_query] 1473 return snpobj 1474 else: 1475 log.info('No variant flips found to correct.') 1476 return self if not inplace else None
Correct flipped variants between between self
and a reference snpobj
, where reference (variants_ref
)
and alternate (variants_alt
) alleles are swapped.
Flip Detection Based on check_complement
:
If
check_complement=False
, only direct allele swaps are considered:- Direct Swap:
self.variants_ref == snpobj.variants_alt
andself.variants_alt == snpobj.variants_ref
.
- Direct Swap:
If
check_complement=True
, both direct and complementary swaps are considered, with four possible cases:- Direct Swap:
self.variants_ref == snpobj.variants_alt
andself.variants_alt == snpobj.variants_ref
. - Complement Swap of Ref:
complement(self.variants_ref) == snpobj.variants_alt
andself.variants_alt == snpobj.variants_ref
. - Complement Swap of Alt:
self.variants_ref == snpobj.variants_alt
andcomplement(self.variants_alt) == snpobj.variants_ref
. - Complement Swap of both Ref and Alt:
complement(self.variants_ref) == snpobj.variants_alt
andcomplement(self.variants_alt) == snpobj.variants_ref
.
- Direct Swap:
Note: Variants where self.variants_ref == self.variants_alt
are ignored as they are ambiguous.
Correction Process:
- Swaps
variants_ref
andvariants_alt
alleles inself
to align withsnpobj
. - Flips
calldata_gt
values (0 becomes 1, and 1 becomes 0) to match the updated allele configuration.
Arguments:
- snpobj (SNPObject): The reference SNPObject to compare against.
- check_complement (bool, default=True): If True, also checks for complementary base pairs (A/T, T/A, C/G, and G/C) when identifying swapped variants. Default is True.
- index_by (str, default='pos'): Criteria for matching variants. Options:
'pos'
: Matches by chromosome and position (variants_chrom
,variants_pos
), e.g., 'chr1-12345'.'id'
: Matches by variant ID alone (variants_id
), e.g., 'rs123'.'pos+id'
: Matches by chromosome, position, and ID (variants_chrom
,variants_pos
,variants_id
), e.g., 'chr1-12345-rs123'. Default is 'pos'.
- common_variants_intersection (tuple of arrays, optional): Precomputed indices of common variants between
self
andsnpobj
. If None, intersection is computed within the function. - log_stats (bool, default=True): If True, logs statistical information about matching and ambiguous alleles. Default is True.
- inplace (bool, default=False): If True, modifies
self
in place. If False, returns a newSNPObject
with corrected flips. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObject
with corrected flips ifinplace=False
. Ifinplace=True
, modifiesself
in place and returns None.
1478 def remove_mismatching_variants( 1479 self, 1480 snpobj: 'SNPObject', 1481 index_by: str = 'pos', 1482 common_variants_intersection: Optional[Tuple[np.ndarray, np.ndarray]] = None, 1483 inplace: bool = False 1484 ) -> Optional['SNPObject']: 1485 """ 1486 Remove variants from `self`, where reference (`variants_ref`) and/or alternate (`variants_alt`) alleles 1487 do not match with a reference `snpobj`. 1488 1489 Args: 1490 snpobj (SNPObject): 1491 The reference SNPObject to compare against. 1492 index_by (str, default='pos'): 1493 Criteria for matching variants. Options: 1494 - `'pos'`: Matches by chromosome and position (`variants_chrom`, `variants_pos`), e.g., 'chr1-12345'. 1495 - `'id'`: Matches by variant ID alone (`variants_id`), e.g., 'rs123'. 1496 - `'pos+id'`: Matches by chromosome, position, and ID (`variants_chrom`, `variants_pos`, `variants_id`), e.g., 'chr1-12345-rs123'. 1497 Default is 'pos'. 1498 common_variants_intersection (tuple of arrays, optional): 1499 Precomputed indices of common variants between `self` and the reference `snpobj`. 1500 If None, the intersection is computed within the function. 1501 inplace (bool, default=False): 1502 If True, modifies `self` in place. If False, returns a new `SNPObject` without 1503 mismatching variants. Default is False. 1504 1505 Returns: 1506 **Optional[SNPObject]:** 1507 A new `SNPObject` without mismatching variants if `inplace=False`. 1508 If `inplace=True`, modifies `self` in place and returns None. 1509 """ 1510 # Get common variant indices if not provided 1511 if common_variants_intersection is not None: 1512 query_idx, reference_idx = common_variants_intersection 1513 else: 1514 _, query_idx, reference_idx = self.get_common_variants_intersection(snpobj, index_by=index_by) 1515 1516 # Vectorized comparison of `ref` and `alt` alleles 1517 ref_mismatch = self['variants_ref'][query_idx] != snpobj['variants_ref'][reference_idx] 1518 alt_mismatch = self['variants_alt'][query_idx] != snpobj['variants_alt'][reference_idx] 1519 mismatch_mask = ref_mismatch | alt_mismatch 1520 1521 # Identify indices in `self` of mismatching variants 1522 mismatch_idx = query_idx[mismatch_mask] 1523 1524 # Compute total number of variant mismatches 1525 total_mismatches = np.sum(mismatch_mask) 1526 1527 # Filter out mismatching variants 1528 log.debug(f'Removing {total_mismatches} mismatching variants...') 1529 return self.filter_variants(indexes=mismatch_idx, include=True, inplace=inplace)
Remove variants from self
, where reference (variants_ref
) and/or alternate (variants_alt
) alleles
do not match with a reference snpobj
.
Arguments:
- snpobj (SNPObject): The reference SNPObject to compare against.
- index_by (str, default='pos'): Criteria for matching variants. Options:
'pos'
: Matches by chromosome and position (variants_chrom
,variants_pos
), e.g., 'chr1-12345'.'id'
: Matches by variant ID alone (variants_id
), e.g., 'rs123'.'pos+id'
: Matches by chromosome, position, and ID (variants_chrom
,variants_pos
,variants_id
), e.g., 'chr1-12345-rs123'. Default is 'pos'.
- common_variants_intersection (tuple of arrays, optional): Precomputed indices of common variants between
self
and the referencesnpobj
. If None, the intersection is computed within the function. - inplace (bool, default=False): If True, modifies
self
in place. If False, returns a newSNPObject
without mismatching variants. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObject
without mismatching variants ifinplace=False
. Ifinplace=True
, modifiesself
in place and returns None.
1531 def shuffle_variants(self, inplace: bool = False) -> Optional['SNPObject']: 1532 """ 1533 Randomly shuffle the positions of variants in the SNPObject, ensuring that all associated 1534 data (e.g., `calldata_gt` and variant-specific attributes) remain aligned. 1535 1536 Args: 1537 inplace (bool, default=False): 1538 If True, modifies `self` in place. If False, returns a new `SNPObject` with 1539 shuffled variants. Default is False. 1540 1541 Returns: 1542 **Optional[SNPObject]:** 1543 A new `SNPObject` without shuffled variant positions if `inplace=False`. 1544 If `inplace=True`, modifies `self` in place and returns None. 1545 """ 1546 # Generate a random permutation index for shuffling variant positions 1547 shuffle_index = np.random.permutation(self.n_snps) 1548 1549 # Apply shuffling to all relevant attributes using the class's dictionary-like interface 1550 if inplace: 1551 for key in self.keys(): 1552 if self[key] is not None: 1553 if key == 'calldata_gt': 1554 # `calldata_gt`` has a different shape, so it's shuffled along axis 0 1555 self[key] = self[key][shuffle_index, ...] 1556 elif 'variant' in key: 1557 # snpobj attributes are 1D arrays 1558 self[key] = np.asarray(self[key])[shuffle_index] 1559 return None 1560 else: 1561 shuffled_snpobj = self.copy() 1562 for key in shuffled_snpobj.keys(): 1563 if shuffled_snpobj[key] is not None: 1564 if key == 'calldata_gt': 1565 shuffled_snpobj[key] = shuffled_snpobj[key][shuffle_index, ...] 1566 elif 'variant' in key: 1567 shuffled_snpobj[key] = np.asarray(shuffled_snpobj[key])[shuffle_index] 1568 return shuffled_snpobj
Randomly shuffle the positions of variants in the SNPObject, ensuring that all associated
data (e.g., calldata_gt
and variant-specific attributes) remain aligned.
Arguments:
- inplace (bool, default=False): If True, modifies
self
in place. If False, returns a newSNPObject
with shuffled variants. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObject
without shuffled variant positions ifinplace=False
. Ifinplace=True
, modifiesself
in place and returns None.
1570 def set_empty_to_missing(self, inplace: bool = False) -> Optional['SNPObject']: 1571 """ 1572 Replace empty strings `''` with missing values `'.'` in attributes of `self`. 1573 1574 Args: 1575 inplace (bool, default=False): 1576 If True, modifies `self` in place. If False, returns a new `SNPObject` with empty 1577 strings `''` replaced by missing values `'.'`. Default is False. 1578 1579 Returns: 1580 **Optional[SNPObject]:** 1581 A new `SNPObject` with empty strings replaced if `inplace=False`. 1582 If `inplace=True`, modifies `self` in place and returns None. 1583 """ 1584 if inplace: 1585 if self.variants_alt is not None: 1586 self.variants_alt[self.variants_alt == ''] = '.' 1587 if self.variants_ref is not None: 1588 self.variants_ref[self.variants_ref == ''] = '.' 1589 if self.variants_qual is not None: 1590 self.variants_qual = self.variants_qual.astype(str) 1591 self.variants_qual[(self.variants_qual == '') | (self.variants_qual == 'nan')] = '.' 1592 if self.variants_chrom is not None: 1593 self.variants_chrom = self.variants_chrom.astype(str) 1594 self.variants_chrom[self.variants_chrom == ''] = '.' 1595 if self.variants_filter_pass is not None: 1596 self.variants_filter_pass[self.variants_filter_pass == ''] = '.' 1597 if self.variants_id is not None: 1598 self.variants_id[self.variants_id == ''] = '.' 1599 return self 1600 else: 1601 snpobj = self.copy() 1602 if snpobj.variants_alt is not None: 1603 snpobj.variants_alt[snpobj.variants_alt == ''] = '.' 1604 if snpobj.variants_ref is not None: 1605 snpobj.variants_ref[snpobj.variants_ref == ''] = '.' 1606 if snpobj.variants_qual is not None: 1607 snpobj.variants_qual = snpobj.variants_qual.astype(str) 1608 snpobj.variants_qual[(snpobj.variants_qual == '') | (snpobj.variants_qual == 'nan')] = '.' 1609 if snpobj.variants_chrom is not None: 1610 snpobj.variants_chrom[snpobj.variants_chrom == ''] = '.' 1611 if snpobj.variants_filter_pass is not None: 1612 snpobj.variants_filter_pass[snpobj.variants_filter_pass == ''] = '.' 1613 if snpobj.variants_id is not None: 1614 snpobj.variants_id[snpobj.variants_id == ''] = '.' 1615 return snpobj
Replace empty strings ''
with missing values '.'
in attributes of self
.
Arguments:
- inplace (bool, default=False): If True, modifies
self
in place. If False, returns a newSNPObject
with empty strings''
replaced by missing values'.'
. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObject
with empty strings replaced ifinplace=False
. Ifinplace=True
, modifiesself
in place and returns None.
1617 def convert_to_window_level( 1618 self, 1619 window_size: Optional[int] = None, 1620 physical_pos: Optional[np.ndarray] = None, 1621 chromosomes: Optional[np.ndarray] = None, 1622 window_sizes: Optional[np.ndarray] = None, 1623 laiobj: Optional['LocalAncestryObject'] = None 1624 ) -> 'LocalAncestryObject': 1625 """ 1626 Aggregate the `calldata_lai` attribute into genomic windows within a 1627 `snputils.ancestry.genobj.LocalAncestryObject`. 1628 1629 **Options for defining windows (in order of precedence):** 1630 1631 1. **Fixed window size**: 1632 - Use `window_size` to specify how many SNPs go into each window. The last window on each 1633 chromosome may be larger if SNPs are not evenly divisible by the size. 1634 1635 2. **Custom start and end positions**: 1636 - Provide `physical_pos` (2D array of shape (n_windows, 2)) as the [start, end] base-pair 1637 coordinates for each window. 1638 - If `chromosomes` is not provided and `self` has exactly one chromosome, all windows are 1639 assumed to belong to that chromosome. 1640 - If multiple chromosomes exist but `chromosomes` is missing, an error will be raised. 1641 - Optionally, provide `window_sizes` to store the SNP count per-window. 1642 1643 3. **Matching existing windows**: 1644 - Reuse window definitions (`physical_pos`, `chromosomes`, `window_sizes`) from an existing `laiobj`. 1645 1646 Args: 1647 window_size (int, optional): 1648 Number of SNPs in each window if defining fixed-size windows. If the total number of 1649 SNPs in a chromosome is not evenly divisible by the window size, the last window on that 1650 chromosome will include all remaining SNPs and therefore be larger than the specified size. 1651 physical_pos (array of shape (n_windows, 2), optional): 1652 A 2D array containing the start and end physical positions for each window. 1653 chromosomes (array of shape (n_windows,), optional): 1654 An array with chromosome numbers corresponding to each genomic window. 1655 window_sizes (array of shape (n_windows,), optional): 1656 An array specifying the number of SNPs in each genomic window. 1657 laiobj (LocalAncestryObject, optional): 1658 A reference `LocalAncestryObject` from which to copy existing window definitions. 1659 1660 Returns: 1661 **LocalAncestryObject:** 1662 A LocalAncestryObject containing window-level ancestry data. 1663 """ 1664 from snputils.ancestry.genobj.local import LocalAncestryObject 1665 1666 if window_size is None and physical_pos is None and laiobj is None: 1667 raise ValueError("One of `window_size`, `physical_pos`, or `laiobj` must be provided.") 1668 1669 # Fixed window size 1670 if window_size is not None: 1671 physical_pos = [] # Boundaries [start, end] of each window 1672 chromosomes = [] # Chromosome for each window 1673 window_sizes = [] # Number of SNPs for each window 1674 for chrom in self.unique_chrom: 1675 # Extract indices corresponding to this chromosome 1676 mask_chrom = (self.variants_chrom == chrom) 1677 # Subset to this chromosome 1678 pos_chrom = self.variants_pos[mask_chrom] 1679 # Number of SNPs for this chromosome 1680 n_snps_chrom = pos_chrom.size 1681 1682 # Initialize the start of the first window with the position of the first SNP 1683 current_start = self.variants_pos[0] 1684 1685 # Number of full windows with exactly `window_size` SNPs 1686 n_full_windows = n_snps_chrom // window_size 1687 1688 # Build all but the last window 1689 for i in range(n_full_windows-1): 1690 current_end = self.variants_pos[(i+1) * window_size - 1] 1691 physical_pos.append([current_start, current_end]) 1692 chromosomes.append(chrom) 1693 window_sizes.append(window_size) 1694 current_start = self.variants_pos[(i+1) * window_size] 1695 1696 # Build the last window 1697 current_end = self.variants_pos[-1] 1698 physical_pos.append([current_start, current_end]) 1699 chromosomes.append(chrom) 1700 window_sizes.append(n_snps_chrom - ((n_full_windows - 1) * window_size)) 1701 1702 physical_pos = np.array(physical_pos) 1703 chromosomes = np.array(chromosomes) 1704 window_sizes = np.array(window_sizes) 1705 1706 # Custom start and end positions 1707 elif physical_pos is not None: 1708 # Check if there is exactly one chromosome 1709 if chromosomes is None: 1710 unique_chrom = self.unique_chrom 1711 if len(unique_chrom) == 1: 1712 # We assume all windows belong to this single chromosome 1713 single_chrom = unique_chrom[0] 1714 chromosomes = np.array([single_chrom] * physical_pos.shape[0]) 1715 else: 1716 raise ValueError("Multiple chromosomes detected, but `chromosomes` was not provided.") 1717 1718 # Match existing windows to a reference laiobj 1719 elif laiobj is not None: 1720 physical_pos = laiobj.physical_pos 1721 chromosomes = laiobj.chromosomes 1722 window_sizes = laiobj.window_sizes 1723 1724 # Allocate an output LAI array 1725 n_windows = physical_pos.shape[0] 1726 n_samples = self.n_samples 1727 if self.calldata_lai.ndim == 3: 1728 lai = np.zeros((n_windows, n_samples, 2)) 1729 else: 1730 lai = np.zeros((n_windows, n_samples*2)) 1731 1732 # For each window, find the relevant SNPs and compute the mode of the ancestries 1733 for i, ((start, end), chrom) in enumerate(zip(physical_pos, chromosomes)): 1734 snps_mask = ( 1735 (self.variants_chrom == chrom) & 1736 (self.variants_pos >= start) & 1737 (self.variants_pos <= end) 1738 ) 1739 if np.any(snps_mask): 1740 lai_mask = self.calldata_lai[snps_mask, ...] 1741 mode_ancestries = mode(lai_mask, axis=0, nan_policy='omit').mode 1742 lai[i] = mode_ancestries 1743 else: 1744 lai[i] = np.nan 1745 1746 # Generate haplotype labels, e.g. "Sample1.0", "Sample1.1" 1747 haplotypes = [f"{sample}.{i}" for sample in self.samples for i in range(2)] 1748 1749 # If original data was (n_snps, n_samples, 2), flatten to (n_windows, n_samples*2) 1750 if self.calldata_lai.ndim == 3: 1751 lai = lai.reshape(n_windows, -1) 1752 1753 # Aggregate into a LocalAncestryObject 1754 return LocalAncestryObject( 1755 haplotypes=haplotypes, 1756 lai=lai, 1757 samples=self.samples, 1758 ancestry_map=self.ancestry_map, 1759 window_sizes=window_sizes, 1760 physical_pos=physical_pos, 1761 chromosomes=chromosomes 1762 )
Aggregate the calldata_lai
attribute into genomic windows within a
snputils.ancestry.genobj.LocalAncestryObject
.
Options for defining windows (in order of precedence):
- Fixed window size:
- Use
window_size
to specify how many SNPs go into each window. The last window on each chromosome may be larger if SNPs are not evenly divisible by the size.
- Custom start and end positions:
- Provide
physical_pos
(2D array of shape (n_windows, 2)) as the [start, end] base-pair coordinates for each window. - If
chromosomes
is not provided andself
has exactly one chromosome, all windows are assumed to belong to that chromosome. - If multiple chromosomes exist but
chromosomes
is missing, an error will be raised. - Optionally, provide
window_sizes
to store the SNP count per-window.
- Matching existing windows:
- Reuse window definitions (
physical_pos
,chromosomes
,window_sizes
) from an existinglaiobj
.
Arguments:
- window_size (int, optional): Number of SNPs in each window if defining fixed-size windows. If the total number of SNPs in a chromosome is not evenly divisible by the window size, the last window on that chromosome will include all remaining SNPs and therefore be larger than the specified size.
- physical_pos (array of shape (n_windows, 2), optional): A 2D array containing the start and end physical positions for each window.
- chromosomes (array of shape (n_windows,), optional): An array with chromosome numbers corresponding to each genomic window.
- window_sizes (array of shape (n_windows,), optional): An array specifying the number of SNPs in each genomic window.
- laiobj (LocalAncestryObject, optional): A reference
LocalAncestryObject
from which to copy existing window definitions.
Returns:
LocalAncestryObject: A LocalAncestryObject containing window-level ancestry data.
1764 def save(self, file: Union[str, Path]) -> None: 1765 """ 1766 Save the data stored in `self` to a specified file. 1767 1768 The format of the saved file is determined by the file extension provided in the `file` 1769 argument. 1770 1771 **Supported formats:** 1772 1773 - `.bed`: Binary PED (Plink) format. 1774 - `.pgen`: Plink2 binary genotype format. 1775 - `.vcf`: Variant Call Format. 1776 - `.pkl`: Pickle format for saving `self` in serialized form. 1777 1778 Args: 1779 file (str or pathlib.Path): 1780 Path to the file where the data will be saved. The extension of the file determines the save format. 1781 Supported extensions: `.bed`, `.pgen`, `.vcf`, `.pkl`. 1782 """ 1783 ext = Path(file).suffix.lower() 1784 if ext == '.bed': 1785 self.save_bed(file) 1786 elif ext == '.pgen': 1787 self.save_pgen(file) 1788 elif ext == '.vcf': 1789 self.save_vcf(file) 1790 elif ext == '.pkl': 1791 self.save_pickle(file) 1792 else: 1793 raise ValueError(f"Unsupported file extension: {ext}")
Save the data stored in self
to a specified file.
The format of the saved file is determined by the file extension provided in the file
argument.
Supported formats:
.bed
: Binary PED (Plink) format..pgen
: Plink2 binary genotype format..vcf
: Variant Call Format..pkl
: Pickle format for savingself
in serialized form.
Arguments:
- file (str or pathlib.Path): Path to the file where the data will be saved. The extension of the file determines the save format.
Supported extensions:
.bed
,.pgen
,.vcf
,.pkl
.
1795 def save_bed(self, file: Union[str, Path]) -> None: 1796 """ 1797 Save the data stored in `self` to a `.bed` file. 1798 1799 Args: 1800 file (str or pathlib.Path): 1801 Path to the file where the data will be saved. It should end with `.bed`. 1802 If the provided path does not have this extension, it will be appended. 1803 """ 1804 from snputils.snp.io.write.bed import BEDWriter 1805 writer = BEDWriter(snpobj=self, filename=file) 1806 writer.write()
Save the data stored in self
to a .bed
file.
Arguments:
- file (str or pathlib.Path): Path to the file where the data will be saved. It should end with
.bed
. If the provided path does not have this extension, it will be appended.
1808 def save_pgen(self, file: Union[str, Path]) -> None: 1809 """ 1810 Save the data stored in `self` to a `.pgen` file. 1811 1812 Args: 1813 file (str or pathlib.Path): 1814 Path to the file where the data will be saved. It should end with `.pgen`. 1815 If the provided path does not have this extension, it will be appended. 1816 """ 1817 from snputils.snp.io.write.pgen import PGENWriter 1818 writer = PGENWriter(snpobj=self, filename=file) 1819 writer.write()
Save the data stored in self
to a .pgen
file.
Arguments:
- file (str or pathlib.Path): Path to the file where the data will be saved. It should end with
.pgen
. If the provided path does not have this extension, it will be appended.
1821 def save_vcf(self, file: Union[str, Path]) -> None: 1822 """ 1823 Save the data stored in `self` to a `.vcf` file. 1824 1825 Args: 1826 file (str or pathlib.Path): 1827 Path to the file where the data will be saved. It should end with `.vcf`. 1828 If the provided path does not have this extension, it will be appended. 1829 """ 1830 from snputils.snp.io.write.vcf import VCFWriter 1831 writer = VCFWriter(snpobj=self, filename=file) 1832 writer.write()
Save the data stored in self
to a .vcf
file.
Arguments:
- file (str or pathlib.Path): Path to the file where the data will be saved. It should end with
.vcf
. If the provided path does not have this extension, it will be appended.
1834 def save_pickle(self, file: Union[str, Path]) -> None: 1835 """ 1836 Save `self` in serialized form to a `.pkl` file. 1837 1838 Args: 1839 file (str or pathlib.Path): 1840 Path to the file where the data will be saved. It should end with `.pkl`. 1841 If the provided path does not have this extension, it will be appended. 1842 """ 1843 import pickle 1844 with open(file, 'wb') as file: 1845 pickle.dump(self, file)
Save self
in serialized form to a .pkl
file.
Arguments:
- file (str or pathlib.Path): Path to the file where the data will be saved. It should end with
.pkl
. If the provided path does not have this extension, it will be appended.