snputils.snp.genobj
17class SNPObject: 18 """ 19 A class for Single Nucleotide Polymorphism (SNP) data, with optional support for 20 SNP-level Local Ancestry Information (LAI). 21 """ 22 def __init__( 23 self, 24 calldata_gt: Optional[np.ndarray] = None, 25 samples: Optional[np.ndarray] = None, 26 variants_ref: Optional[np.ndarray] = None, 27 variants_alt: Optional[np.ndarray] = None, 28 variants_chrom: Optional[np.ndarray] = None, 29 variants_filter_pass: Optional[np.ndarray] = None, 30 variants_id: Optional[np.ndarray] = None, 31 variants_pos: Optional[np.ndarray] = None, 32 variants_qual: Optional[np.ndarray] = None, 33 calldata_lai: Optional[np.ndarray] = None, 34 ancestry_map: Optional[Dict[str, str]] = None 35 ) -> None: 36 """ 37 Args: 38 calldata_gt (array, optional): 39 An array containing genotype data for each sample. This array can be either 2D with shape 40 `(n_snps, n_samples)` if the paternal and maternal strands are summed, or 3D with shape 41 `(n_snps, n_samples, 2)` if the strands are kept separate. 42 samples (array of shape (n_sampels,), optional): 43 An array containing unique sample identifiers. 44 variants_ref (array of shape (n_snps,), optional): 45 An array containing the reference allele for each SNP. 46 variants_alt (array of shape (n_snps,), optional): 47 An array containing the alternate allele for each SNP. 48 variants_chrom (array of shape (n_snps,), optional): 49 An array containing the chromosome for each SNP. 50 variants_filter_pass (array of shape (n_snps,), optional): 51 An array indicating whether each SNP passed control checks. 52 variants_id (array of shape (n_snps,), optional): 53 An array containing unique identifiers (IDs) for each SNP. 54 variants_pos (array of shape (n_snps,), optional): 55 An array containing the chromosomal positions for each SNP. 56 variants_qual (array of shape (n_snps,), optional): 57 An array containing the Phred-scaled quality score for each SNP. 58 calldata_lai (array, optional): 59 An array containing the ancestry for each SNP. This array can be either 2D with shape 60 `(n_snps, n_samples*2)`, or 3D with shape (n_snps, n_samples, 2). 61 ancestry_map (dict of str to str, optional): 62 A dictionary mapping ancestry codes to region names. 63 """ 64 self.__calldata_gt = calldata_gt 65 self.__samples = samples 66 self.__variants_ref = variants_ref 67 self.__variants_alt = variants_alt 68 self.__variants_chrom = variants_chrom 69 self.__variants_filter_pass = variants_filter_pass 70 self.__variants_id = variants_id 71 self.__variants_pos = variants_pos 72 self.__variants_qual = variants_qual 73 self.__calldata_lai = calldata_lai 74 self.__ancestry_map = ancestry_map 75 76 self._sanity_check() 77 78 def __getitem__(self, key: str) -> Any: 79 """ 80 To access an attribute of the class using the square bracket notation, 81 similar to a dictionary. 82 """ 83 try: 84 return getattr(self, key) 85 except: 86 raise KeyError(f'Invalid key: {key}.') 87 88 def __setitem__(self, key: str, value: Any): 89 """ 90 To set an attribute of the class using the square bracket notation, 91 similar to a dictionary. 92 """ 93 try: 94 setattr(self, key, value) 95 except: 96 raise KeyError(f'Invalid key: {key}.') 97 98 @property 99 def calldata_gt(self) -> np.ndarray: 100 """ 101 Retrieve `calldata_gt`. 102 103 Returns: 104 **array:** 105 An array containing genotype data for each sample. This array can be either 2D with shape 106 `(n_snps, n_samples)` if the paternal and maternal strands are summed, or 3D with shape 107 `(n_snps, n_samples, 2)` if the strands are kept separate. 108 """ 109 return self.__calldata_gt 110 111 @calldata_gt.setter 112 def calldata_gt(self, x: np.ndarray): 113 """ 114 Update `calldata_gt`. 115 """ 116 self.__calldata_gt = x 117 118 @property 119 def samples(self) -> Optional[np.ndarray]: 120 """ 121 Retrieve `samples`. 122 123 Returns: 124 **array of shape (n_sampels,):** 125 An array containing unique sample identifiers. 126 """ 127 return self.__samples 128 129 @samples.setter 130 def samples(self, x: Union[List, np.ndarray]): 131 """ 132 Update `samples`. 133 """ 134 self.__samples = np.asarray(x) 135 136 @property 137 def variants_ref(self) -> Optional[np.ndarray]: 138 """ 139 Retrieve `variants_ref`. 140 141 Returns: 142 **array of shape (n_snps,):** An array containing the reference allele for each SNP. 143 """ 144 return self.__variants_ref 145 146 @variants_ref.setter 147 def variants_ref(self, x: np.ndarray): 148 """ 149 Update `variants_ref`. 150 """ 151 self.__variants_ref = x 152 153 @property 154 def variants_alt(self) -> Optional[np.ndarray]: 155 """ 156 Retrieve `variants_alt`. 157 158 Returns: 159 **array of shape (n_snps,):** An array containing the alternate allele for each SNP. 160 """ 161 return self.__variants_alt 162 163 @variants_alt.setter 164 def variants_alt(self, x: np.ndarray): 165 """ 166 Update `variants_alt`. 167 """ 168 self.__variants_alt = x 169 170 @property 171 def variants_chrom(self) -> Optional[np.ndarray]: 172 """ 173 Retrieve `variants_chrom`. 174 175 Returns: 176 **array of shape (n_snps,):** An array containing the chromosome for each SNP. 177 """ 178 return self.__variants_chrom 179 180 @variants_chrom.setter 181 def variants_chrom(self, x: np.ndarray): 182 """ 183 Update `variants_chrom`. 184 """ 185 self.__variants_chrom = x 186 187 @property 188 def variants_filter_pass(self) -> Optional[np.ndarray]: 189 """ 190 Retrieve `variants_filter_pass`. 191 192 Returns: 193 **array of shape (n_snps,):** An array indicating whether each SNP passed control checks. 194 """ 195 return self.__variants_filter_pass 196 197 @variants_filter_pass.setter 198 def variants_filter_pass(self, x: np.ndarray): 199 """ 200 Update `variants_filter_pass`. 201 """ 202 self.__variants_filter_pass = x 203 204 @property 205 def variants_id(self) -> Optional[np.ndarray]: 206 """ 207 Retrieve `variants_id`. 208 209 Returns: 210 **array of shape (n_snps,):** An array containing unique identifiers (IDs) for each SNP. 211 """ 212 return self.__variants_id 213 214 @variants_id.setter 215 def variants_id(self, x: np.ndarray): 216 """ 217 Update `variants_id`. 218 """ 219 self.__variants_id = x 220 221 @property 222 def variants_pos(self) -> Optional[np.ndarray]: 223 """ 224 Retrieve `variants_pos`. 225 226 Returns: 227 **array of shape (n_snps,):** An array containing the chromosomal positions for each SNP. 228 """ 229 return self.__variants_pos 230 231 @variants_pos.setter 232 def variants_pos(self, x: np.ndarray): 233 """ 234 Update `variants_pos`. 235 """ 236 self.__variants_pos = x 237 238 @property 239 def variants_qual(self) -> Optional[np.ndarray]: 240 """ 241 Retrieve `variants_qual`. 242 243 Returns: 244 **array of shape (n_snps,):** An array containing the Phred-scaled quality score for each SNP. 245 """ 246 return self.__variants_qual 247 248 @variants_qual.setter 249 def variants_qual(self, x: np.ndarray): 250 """ 251 Update `variants_qual`. 252 """ 253 self.__variants_qual = x 254 255 @property 256 def calldata_lai(self) -> Optional[np.ndarray]: 257 """ 258 Retrieve `calldata_lai`. 259 260 Returns: 261 **array:** 262 An array containing the ancestry for each SNP. This array can be either 2D with shape 263 `(n_snps, n_samples*2)`, or 3D with shape (n_snps, n_samples, 2). 264 """ 265 return self.__calldata_lai 266 267 @calldata_lai.setter 268 def calldata_lai(self, x: np.ndarray): 269 """ 270 Update `calldata_lai`. 271 """ 272 self.__calldata_lai = x 273 274 @property 275 def ancestry_map(self) -> Optional[Dict[str, str]]: 276 """ 277 Retrieve `ancestry_map`. 278 279 Returns: 280 **dict of str to str:** A dictionary mapping ancestry codes to region names. 281 """ 282 return self.__ancestry_map 283 284 @ancestry_map.setter 285 def ancestry_map(self, x): 286 """ 287 Update `ancestry_map`. 288 """ 289 self.__ancestry_map = x 290 291 @property 292 def n_samples(self) -> int: 293 """ 294 Retrieve `n_samples`. 295 296 Returns: 297 **int:** The total number of samples. 298 """ 299 if self.__samples is not None: 300 return len(self.__samples) 301 elif self.__calldata_gt is not None: 302 return self.__calldata_gt.shape[1] 303 elif self.__calldata_lai is not None: 304 if self.__calldata_lai.ndim == 2: 305 return self.__calldata_lai.shape[1] // 2 306 elif self.__calldata_lai.ndim == 3: 307 return self.__calldata_lai.shape[1] 308 else: 309 raise ValueError("Unable to determine the total number of samples: no relevant data is available.") 310 311 @property 312 def n_snps(self) -> int: 313 """ 314 Retrieve `n_snps`. 315 316 Returns: 317 **int:** The total number of SNPs. 318 """ 319 # List of attributes that can indicate the number of SNPs 320 potential_attributes = [ 321 self.__calldata_gt, 322 self.__variants_ref, 323 self.__variants_alt, 324 self.__variants_chrom, 325 self.__variants_filter_pass, 326 self.__variants_id, 327 self.__variants_pos, 328 self.__variants_qual, 329 self.__calldata_lai 330 ] 331 332 # Check each attribute for its first dimension, which corresponds to `n_snps` 333 for attr in potential_attributes: 334 if attr is not None: 335 return attr.shape[0] 336 337 raise ValueError("Unable to determine the total number of SNPs: no relevant data is available.") 338 339 @property 340 def n_chrom(self) -> Optional[int]: 341 """ 342 Retrieve `n_chrom`. 343 344 Returns: 345 **int:** The total number of unique chromosomes in `variants_chrom`. 346 """ 347 if self.variants_chrom is None: 348 warnings.warn("Chromosome data `variants_chrom` is None.") 349 return None 350 351 return len(self.unique_chrom) 352 353 @property 354 def n_ancestries(self) -> int: 355 """ 356 Retrieve `n_ancestries`. 357 358 Returns: 359 **int:** The total number of unique ancestries. 360 """ 361 if self.__calldata_lai is not None: 362 return len(np.unique(self.__calldata_lai)) 363 else: 364 raise ValueError("Unable to determine the total number of ancestries: no relevant data is available.") 365 366 @property 367 def unique_chrom(self) -> Optional[np.ndarray]: 368 """ 369 Retrieve `unique_chrom`. 370 371 Returns: 372 **array:** The unique chromosome names in `variants_chrom`, preserving their order of appearance. 373 """ 374 if self.variants_chrom is None: 375 warnings.warn("Chromosome data `variants_chrom` is None.") 376 return None 377 378 # Identify unique chromosome names and their first indexes of occurrence 379 _, idx = np.unique(self.variants_chrom, return_index=True) 380 # Return chromosome names sorted by their first occurrence to maintain original order 381 return self.variants_chrom[np.sort(idx)] 382 383 @property 384 def are_strands_summed(self) -> bool: 385 """ 386 Retrieve `are_strands_summed`. 387 388 Returns: 389 **bool:** 390 True if the maternal and paternal strands have been summed together, which is indicated by 391 `calldata_gt` having shape `(n_samples, n_snps)`. False if the strands are stored separately, 392 indicated by `calldata_gt` having shape `(n_samples, n_snps, 2)`. 393 """ 394 if self.calldata_gt is None: 395 warnings.warn("Genotype data `calldata_gt` is None.") 396 return None 397 398 return self.calldata_gt.ndim == 2 399 400 def copy(self) -> 'SNPObject': 401 """ 402 Create and return a copy of `self`. 403 404 Returns: 405 **SNPObject:** 406 A new instance of the current object. 407 """ 408 return copy.deepcopy(self) 409 410 def keys(self) -> List[str]: 411 """ 412 Retrieve a list of public attribute names for `self`. 413 414 Returns: 415 **list of str:** 416 A list of attribute names, with internal name-mangling removed, 417 for easier reference to public attributes in the instance. 418 """ 419 return [attr.replace('_SNPObject__', '') for attr in vars(self)] 420 421 def sum_strands(self, inplace: bool = False) -> Optional['SNPObject']: 422 """ 423 Sum paternal and maternal strands. 424 425 Args: 426 inplace (bool, default=False): 427 If True, modifies `self` in place. If False, returns a new `SNPObject` with the variants 428 filtered. Default is False. 429 430 Returns: 431 **Optional[SNPObject]:** 432 A new `SNPObject` with summed strands if `inplace=False`. 433 If `inplace=True`, modifies `self` in place and returns None. 434 """ 435 if self.calldata_gt is None: 436 warnings.warn("Genotype data `calldata_gt` is None.") 437 return None if not inplace else self 438 439 if self.are_strands_summed: 440 warnings.warn("Genotype data `calldata_gt` is already summed.") 441 return self if inplace else self.copy() 442 443 if inplace: 444 self.calldata_gt = self.calldata_gt.sum(axis=2, dtype=np.int8) 445 return self 446 else: 447 snpobj = self.copy() 448 snpobj.calldata_gt = self.calldata_gt.sum(axis=2, dtype=np.int8) 449 return snpobj 450 451 def filter_variants( 452 self, 453 chrom: Optional[Union[str, Sequence[str], np.ndarray, None]] = None, 454 pos: Optional[Union[int, Sequence[int], np.ndarray, None]] = None, 455 indexes: Optional[Union[int, Sequence[int], np.ndarray, None]] = None, 456 include: bool = True, 457 inplace: bool = False 458 ) -> Optional['SNPObject']: 459 """ 460 Filter variants based on specified chromosome names, variant positions, or variant indexes. 461 462 This method updates the `calldata_gt`, `variants_ref`, `variants_alt`, 463 `variants_chrom`, `variants_filter_pass`, `variants_id`, `variants_pos`, 464 `variants_qual`, and `lai` attributes to include or exclude the specified variants. The filtering 465 criteria can be based on chromosome names, variant positions, or indexes. If multiple 466 criteria are provided, their union is used for filtering. The order of the variants is preserved. 467 468 Negative indexes are supported and follow 469 [NumPy's indexing conventions](https://numpy.org/doc/stable/user/basics.indexing.html). 470 471 Args: 472 chrom (str or array_like of str, optional): 473 Chromosome(s) to filter variants by. Can be a single chromosome as a string or a sequence 474 of chromosomes. If both `chrom` and `pos` are provided, they must either have matching lengths 475 (pairing each chromosome with a position) or `chrom` should be a single value that applies to 476 all positions in `pos`. Default is None. 477 pos (int or array_like of int, optional): 478 Position(s) to filter variants by. Can be a single position as an integer or a sequence of positions. 479 If `chrom` is also provided, `pos` should either match `chrom` in length or `chrom` should be a 480 single value. Default is None. 481 indexes (int or array_like of int, optional): 482 Index(es) of the variants to include or exclude. Can be a single index or a sequence 483 of indexes. Negative indexes are supported. Default is None. 484 include (bool, default=True): 485 If True, includes only the specified variants. If False, excludes the specified 486 variants. Default is True. 487 inplace (bool, default=False): 488 If True, modifies `self` in place. If False, returns a new `SNPObject` with the variants 489 filtered. Default is False. 490 491 Returns: 492 **Optional[SNPObject]:** 493 A new `SNPObject` with the specified variants filtered if `inplace=False`. 494 If `inplace=True`, modifies `self` in place and returns None. 495 """ 496 if chrom is None and pos is None and indexes is None: 497 raise ValueError("At least one of 'chrom', 'pos', or 'indexes' must be provided.") 498 499 n_snps = self.n_snps 500 501 # Convert inputs to arrays for consistency 502 chrom = np.atleast_1d(chrom) if chrom is not None else None 503 pos = np.atleast_1d(pos) if pos is not None else None 504 indexes = np.atleast_1d(indexes) if indexes is not None else None 505 506 # Validate chrom and pos lengths if both are provided 507 if chrom is not None and pos is not None: 508 if len(chrom) != len(pos) and len(chrom) > 1: 509 raise ValueError( 510 "When both 'chrom' and 'pos' are provided, they must either be of the same length " 511 "or 'chrom' must be a single value." 512 ) 513 514 # Create a mask for chromosome and position filtering 515 mask_combined = np.zeros(n_snps, dtype=bool) 516 if chrom is not None and pos is not None: 517 if len(chrom) == 1: 518 # Apply single chromosome to all positions in `pos` 519 mask_combined = (self['variants_chrom'] == chrom[0]) & np.isin(self['variants_pos'], pos) 520 else: 521 # Vectorized pair matching for chrom and pos 522 query_pairs = np.array( 523 list(zip(chrom, pos)), 524 dtype=[ 525 ('chrom', self['variants_chrom'].dtype), 526 ('pos', self['variants_pos'].dtype) 527 ] 528 ) 529 data_pairs = np.array( 530 list(zip(self['variants_chrom'], self['variants_pos'])), 531 dtype=[ 532 ('chrom', self['variants_chrom'].dtype), 533 ('pos', self['variants_pos'].dtype) 534 ] 535 ) 536 mask_combined = np.isin(data_pairs, query_pairs) 537 538 elif chrom is not None: 539 # Only chromosome filtering 540 mask_combined = np.isin(self['variants_chrom'], chrom) 541 elif pos is not None: 542 # Only position filtering 543 mask_combined = np.isin(self['variants_pos'], pos) 544 545 # Create mask based on indexes if provided 546 if indexes is not None: 547 # Validate indexes, allowing negative indexes 548 out_of_bounds_indexes = indexes[(indexes < -n_snps) | (indexes >= n_snps)] 549 if out_of_bounds_indexes.size > 0: 550 raise ValueError(f"One or more sample indexes are out of bounds.") 551 552 # Handle negative indexes and check for out-of-bounds indexes 553 adjusted_indexes = np.mod(indexes, n_snps) 554 555 # Create mask for specified indexes 556 mask_indexes = np.zeros(n_snps, dtype=bool) 557 mask_indexes[adjusted_indexes] = True 558 559 # Combine with `chrom` and `pos` mask using logical OR (union of all specified criteria) 560 mask_combined = mask_combined | mask_indexes 561 562 # Invert mask if `include` is False 563 if not include: 564 mask_combined = ~mask_combined 565 566 # Define keys to filter 567 keys = [ 568 'calldata_gt', 'variants_ref', 'variants_alt', 'variants_chrom', 'variants_filter_pass', 569 'variants_id', 'variants_pos', 'variants_qual', 'calldata_lai' 570 ] 571 572 # Apply filtering based on inplace parameter 573 if inplace: 574 for key in keys: 575 if self[key] is not None: 576 if self[key].ndim > 1: 577 self[key] = np.asarray(self[key])[mask_combined, ...] 578 else: 579 self[key] = np.asarray(self[key])[mask_combined] 580 581 return None 582 else: 583 # Create A new `SNPObject` with filtered data 584 snpobj = self.copy() 585 for key in keys: 586 if snpobj[key] is not None: 587 if snpobj[key].ndim > 1: 588 snpobj[key] = np.asarray(snpobj[key])[mask_combined, ...] 589 else: 590 snpobj[key] = np.asarray(snpobj[key])[mask_combined] 591 592 return snpobj 593 594 def filter_samples( 595 self, 596 samples: Optional[Union[str, Sequence[str], np.ndarray, None]] = None, 597 indexes: Optional[Union[int, Sequence[int], np.ndarray, None]] = None, 598 include: bool = True, 599 reorder: bool = False, 600 inplace: bool = False 601 ) -> Optional['SNPObject']: 602 """ 603 Filter samples based on specified names or indexes. 604 605 This method updates the `samples` and `calldata_gt` attributes to include or exclude the specified 606 samples. The order of the samples is preserved. Set `reorder=True` to match the ordering of the 607 provided `samples` and/or `indexes` lists when including. 608 609 If both samples and indexes are provided, any sample matching either a name in samples or an index in 610 indexes will be included or excluded. 611 612 This method allows inclusion or exclusion of specific samples by their names or 613 indexes. When both sample names and indexes are provided, the union of the specified samples 614 is used. Negative indexes are supported and follow 615 [NumPy's indexing conventions](https://numpy.org/doc/stable/user/basics.indexing.html). 616 617 Args: 618 samples (str or array_like of str, optional): 619 Name(s) of the samples to include or exclude. Can be a single sample name or a 620 sequence of sample names. Default is None. 621 indexes (int or array_like of int, optional): 622 Index(es) of the samples to include or exclude. Can be a single index or a sequence 623 of indexes. Negative indexes are supported. Default is None. 624 include (bool, default=True): 625 If True, includes only the specified samples. If False, excludes the specified 626 samples. Default is True. 627 inplace (bool, default=False): 628 If True, modifies `self` in place. If False, returns a new `SNPObject` with the samples 629 filtered. Default is False. 630 631 Returns: 632 **Optional[SNPObject]:** 633 A new `SNPObject` with the specified samples filtered if `inplace=False`. 634 If `inplace=True`, modifies `self` in place and returns None. 635 """ 636 if samples is None and indexes is None: 637 raise ValueError("At least one of 'samples' or 'indexes' must be provided.") 638 639 n_samples = self.n_samples 640 sample_names = np.array(self['samples']) 641 642 # Create mask based on sample names 643 if samples is not None: 644 samples = np.asarray(samples).ravel() 645 mask_samples = np.isin(sample_names, samples) 646 missing_samples = samples[~np.isin(samples, sample_names)] 647 if missing_samples.size > 0: 648 raise ValueError(f"The following specified samples were not found: {missing_samples.tolist()}") 649 else: 650 mask_samples = np.zeros(n_samples, dtype=bool) 651 652 # Create mask based on sample indexes 653 if indexes is not None: 654 indexes = np.asarray(indexes).ravel() 655 656 # Validate indexes, allowing negative indexes 657 out_of_bounds_indexes = indexes[(indexes < -n_samples) | (indexes >= n_samples)] 658 if out_of_bounds_indexes.size > 0: 659 raise ValueError(f"One or more sample indexes are out of bounds.") 660 661 # Handle negative indexes 662 adjusted_indexes = np.mod(indexes, n_samples) 663 664 mask_indexes = np.zeros(n_samples, dtype=bool) 665 mask_indexes[adjusted_indexes] = True 666 else: 667 mask_indexes = np.zeros(n_samples, dtype=bool) 668 669 # Combine masks using logical OR (union of samples) 670 mask_combined = mask_samples | mask_indexes 671 672 if not include: 673 mask_combined = ~mask_combined 674 675 # If requested, compute an ordering of selected samples that follows the provided lists. 676 ordered_indices = None 677 if include and reorder: 678 sel_indices = np.where(mask_combined)[0] 679 ordered_list: List[int] = [] 680 added = np.zeros(n_samples, dtype=bool) 681 682 # Prioritize the order in `samples` 683 if samples is not None: 684 name_to_idx = {name: idx for idx, name in enumerate(sample_names)} 685 for s in samples: 686 idx = name_to_idx.get(s) 687 if idx is not None and mask_combined[idx] and not added[idx]: 688 ordered_list.append(idx) 689 added[idx] = True 690 691 # Then respect the order in `indexes` 692 if indexes is not None: 693 adj_idx = np.mod(np.atleast_1d(indexes), n_samples) 694 for idx in adj_idx: 695 if mask_combined[idx] and not added[idx]: 696 ordered_list.append(int(idx)) 697 added[idx] = True 698 699 # Finally, append any remaining selected samples in their original order 700 for idx in sel_indices: 701 if not added[idx]: 702 ordered_list.append(int(idx)) 703 704 ordered_indices = np.asarray(ordered_list, dtype=int) 705 706 # Define keys to filter 707 keys = ['samples', 'calldata_gt', 'calldata_lai'] 708 709 # Apply filtering based on inplace parameter 710 if inplace: 711 for key in keys: 712 if self[key] is not None: 713 arr = np.asarray(self[key]) 714 if ordered_indices is not None: 715 if key == 'calldata_lai' and arr.ndim == 2: 716 # Haplotype-aware reordering for 2D LAI (n_snps, n_samples*2) 717 hap_idx = np.concatenate([2*ordered_indices, 2*ordered_indices + 1]) 718 self[key] = arr[:, hap_idx] 719 elif arr.ndim > 1: 720 self[key] = arr[:, ordered_indices, ...] 721 else: 722 self[key] = arr[ordered_indices] 723 else: 724 if arr.ndim > 1: 725 self[key] = arr[:, mask_combined, ...] 726 else: 727 self[key] = arr[mask_combined] 728 729 return None 730 else: 731 # Create A new `SNPObject` with filtered data 732 snpobj = self.copy() 733 for key in keys: 734 if snpobj[key] is not None: 735 arr = np.asarray(snpobj[key]) 736 if ordered_indices is not None: 737 if key == 'calldata_lai' and arr.ndim == 2: 738 hap_idx = np.concatenate([2*ordered_indices, 2*ordered_indices + 1]) 739 snpobj[key] = arr[:, hap_idx] 740 elif arr.ndim > 1: 741 snpobj[key] = arr[:, ordered_indices, ...] 742 else: 743 snpobj[key] = arr[ordered_indices] 744 else: 745 if arr.ndim > 1: 746 snpobj[key] = arr[:, mask_combined, ...] 747 else: 748 snpobj[key] = arr[mask_combined] 749 return snpobj 750 751 def detect_chromosome_format(self) -> str: 752 """ 753 Detect the chromosome naming convention in `variants_chrom` based on the prefix 754 of the first chromosome identifier in `unique_chrom`. 755 756 **Recognized formats:** 757 758 - `'chr'`: Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'. 759 - `'chm'`: Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'. 760 - `'chrom'`: Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'. 761 - `'plain'`: Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'. 762 763 If the format does not match any recognized pattern, `'Unknown format'` is returned. 764 765 Returns: 766 **str:** 767 A string indicating the detected chromosome format (`'chr'`, `'chm'`, `'chrom'`, or `'plain'`). 768 If no recognized format is matched, returns `'Unknown format'`. 769 """ 770 # Select the first unique chromosome identifier for format detection 771 chromosome_str = self.unique_chrom[0] 772 773 # Define regular expressions to match each recognized chromosome format 774 patterns = { 775 'chr': r'^chr(\d+|X|Y|M)$', # Matches 'chr' prefixed format 776 'chm': r'^chm(\d+|X|Y|M)$', # Matches 'chm' prefixed format 777 'chrom': r'^chrom(\d+|X|Y|M)$', # Matches 'chrom' prefixed format 778 'plain': r'^(\d+|X|Y|M)$' # Matches plain format without prefix 779 } 780 781 # Iterate through the patterns to identify the chromosome format 782 for prefix, pattern in patterns.items(): 783 if re.match(pattern, chromosome_str): 784 return prefix # Return the recognized format prefix 785 786 # If no pattern matches, return 'Unknown format' 787 return 'Unknown format' 788 789 def convert_chromosome_format( 790 self, 791 from_format: str, 792 to_format: str, 793 inplace: bool = False 794 ) -> Optional['SNPObject']: 795 """ 796 Convert the chromosome format from one naming convention to another in `variants_chrom`. 797 798 **Supported formats:** 799 800 - `'chr'`: Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'. 801 - `'chm'`: Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'. 802 - `'chrom'`: Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'. 803 - `'plain'`: Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'. 804 805 Args: 806 from_format (str): 807 The current chromosome format. Acceptable values are `'chr'`, `'chm'`, `'chrom'`, or `'plain'`. 808 to_format (str): 809 The target format for chromosome data conversion. Acceptable values match `from_format` options. 810 inplace (bool, default=False): 811 If True, modifies `self` in place. If False, returns a new `SNPObject` with the converted format. 812 Default is False. 813 814 Returns: 815 **Optional[SNPObject]:** A new `SNPObject` with the converted chromosome format if `inplace=False`. 816 If `inplace=True`, modifies `self` in place and returns None. 817 """ 818 # Define the list of standard chromosome identifiers 819 chrom_list = [*map(str, range(1, 23)), 'X', 'Y', 'M'] # M for mitochondrial chromosomes 820 821 # Format mappings for different chromosome naming conventions 822 format_mappings = { 823 'chr': [f'chr{i}' for i in chrom_list], 824 'chm': [f'chm{i}' for i in chrom_list], 825 'chrom': [f'chrom{i}' for i in chrom_list], 826 'plain': chrom_list, 827 } 828 829 # Verify that from_format and to_format are valid naming conventions 830 if from_format not in format_mappings or to_format not in format_mappings: 831 raise ValueError(f"Invalid format: {from_format} or {to_format}. Must be one of {list(format_mappings.keys())}.") 832 833 # Convert chromosomes to string for consistent comparison 834 variants_chrom = self['variants_chrom'].astype(str) 835 836 # Verify that all chromosomes in the object follow the specified `from_format` 837 expected_chroms = set(format_mappings[from_format]) 838 mismatched_chroms = set(variants_chrom) - expected_chroms 839 840 if mismatched_chroms: 841 raise ValueError(f"The following chromosomes do not match the `from_format` '{from_format}': {mismatched_chroms}.") 842 843 # Create conditions for selecting based on current `from_format` names 844 conditions = [variants_chrom == chrom for chrom in format_mappings[from_format]] 845 846 # Rename chromosomes based on inplace flag 847 if inplace: 848 self['variants_chrom'] = np.select(conditions, format_mappings[to_format], default='unknown') 849 return None 850 else: 851 snpobject = self.copy() 852 snpobject['variants_chrom'] = np.select(conditions, format_mappings[to_format], default='unknown') 853 return snpobject 854 855 def match_chromosome_format(self, snpobj: 'SNPObject', inplace: bool = False) -> Optional['SNPObject']: 856 """ 857 Convert the chromosome format in `variants_chrom` from `self` to match the format of a reference `snpobj`. 858 859 **Recognized formats:** 860 861 - `'chr'`: Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'. 862 - `'chm'`: Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'. 863 - `'chrom'`: Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'. 864 - `'plain'`: Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'. 865 866 Args: 867 snpobj (SNPObject): 868 The reference SNPObject whose chromosome format will be matched. 869 inplace (bool, default=False): 870 If True, modifies `self` in place. If False, returns a new `SNPObject` with the 871 chromosome format matching that of `snpobj`. Default is False. 872 873 Returns: 874 **Optional[SNPObject]:** 875 A new `SNPObject` with matched chromosome format if `inplace=False`. 876 If `inplace=True`, modifies `self` in place and returns None. 877 """ 878 # Detect the chromosome naming format of the current SNPObject 879 fmt1 = self.detect_chromosome_format() 880 if fmt1 == 'Unknown format': 881 raise ValueError("The chromosome format of the current SNPObject is unrecognized.") 882 883 # Detect the chromosome naming format of the reference SNPObject 884 fmt2 = snpobj.detect_chromosome_format() 885 if fmt2 == 'Unknown format': 886 raise ValueError("The chromosome format of the reference SNPObject is unrecognized.") 887 888 # Convert the current SNPObject's chromosome format to match the reference format 889 return self.convert_chromosome_format(fmt1, fmt2, inplace=inplace) 890 891 def rename_chrom( 892 self, 893 to_replace: Union[Dict[str, str], str, List[str]] = {'^([0-9]+)$': r'chr\1', r'^chr([0-9]+)$': r'\1'}, 894 value: Optional[Union[str, List[str]]] = None, 895 regex: bool = True, 896 inplace: bool = False 897 ) -> Optional['SNPObject']: 898 """ 899 Replace chromosome values in `variants_chrom` using patterns or exact matches. 900 901 This method allows flexible chromosome replacements, using regex or exact matches, useful 902 for non-standard chromosome formats. For standard conversions (e.g., 'chr1' to '1'), 903 consider `convert_chromosome_format`. 904 905 Args: 906 to_replace (dict, str, or list of str): 907 Pattern(s) or exact value(s) to be replaced in chromosome names. Default behavior 908 transforms `<chrom_num>` to `chr<chrom_num>` or vice versa. Non-matching values 909 remain unchanged. 910 - If str or list of str: Matches will be replaced with `value`. 911 - If regex (bool), then any regex matches will be replaced with `value`. 912 - If dict: Keys defines values to replace, with corresponding replacements as values. 913 value (str or list of str, optional): 914 Replacement value(s) if `to_replace` is a string or list. Ignored if `to_replace` 915 is a dictionary. 916 regex (bool, default=True): 917 If True, interprets `to_replace` keys as regex patterns. 918 inplace (bool, default=False): 919 If True, modifies `self` in place. If False, returns a new `SNPObject` with the chromosomes 920 renamed. Default is False. 921 922 Returns: 923 **Optional[SNPObject]:** A new `SNPObject` with the renamed chromosome format if `inplace=False`. 924 If `inplace=True`, modifies `self` in place and returns None. 925 """ 926 # Standardize input format: convert `to_replace` and `value` to a dictionary if needed 927 if isinstance(to_replace, (str, int)): 928 to_replace = [to_replace] 929 if isinstance(value, (str, int)): 930 value = [value] 931 if isinstance(to_replace, list) and isinstance(value, list): 932 dictionary = dict(zip(to_replace, value)) 933 elif isinstance(to_replace, dict) and value is None: 934 dictionary = to_replace 935 else: 936 raise ValueError( 937 "Invalid input: `to_replace` and `value` must be compatible types (both str, list of str, or dict)." 938 ) 939 940 # Vectorized function for replacing values in chromosome array 941 vec_replace_values = np.vectorize(self._match_to_replace) 942 943 # Rename chromosomes based on inplace flag 944 if inplace: 945 self.variants_chrom = vec_replace_values(self.variants_chrom, dictionary, regex) 946 return None 947 else: 948 snpobj = self.copy() 949 snpobj.variants_chrom = vec_replace_values(self.variants_chrom, dictionary, regex) 950 return snpobj 951 952 def rename_missings( 953 self, 954 before: Union[int, float, str] = -1, 955 after: Union[int, float, str] = '.', 956 inplace: bool = False 957 ) -> Optional['SNPObject']: 958 """ 959 Replace missing values in the `calldata_gt` attribute. 960 961 This method identifies missing values in 'calldata_gt' and replaces them with a specified 962 value. By default, it replaces occurrences of `-1` (often used to signify missing data) with `'.'`. 963 964 Args: 965 before (int, float, or str, default=-1): 966 The current representation of missing values in `calldata_gt`. Common values might be -1, '.', or NaN. 967 Default is -1. 968 after (int, float, or str, default='.'): 969 The value that will replace `before`. Default is '.'. 970 inplace (bool, default=False): 971 If True, modifies `self` in place. If False, returns a new `SNPObject` with the applied 972 replacements. Default is False. 973 974 Returns: 975 **Optional[SNPObject]:** 976 A new `SNPObject` with the renamed missing values if `inplace=False`. 977 If `inplace=True`, modifies `self` in place and returns None. 978 """ 979 # Rename missing values in the `calldata_gt` attribute based on inplace flag 980 if inplace: 981 self['calldata_gt'] = np.where(self['calldata_gt'] == before, after, self['calldata_gt']) 982 return None 983 else: 984 snpobj = self.copy() 985 snpobj['calldata_gt'] = np.where(snpobj['calldata_gt'] == before, after, snpobj['calldata_gt']) 986 return snpobj 987 988 def get_common_variants_intersection( 989 self, 990 snpobj: 'SNPObject', 991 index_by: str = 'pos' 992 ) -> Tuple[List[str], np.ndarray, np.ndarray]: 993 """ 994 Identify common variants between `self` and the `snpobj` instance based on the specified `index_by` criterion, 995 which may match based on chromosome and position (`variants_chrom`, `variants_pos`), ID (`variants_id`), or both. 996 997 This method returns the identifiers of common variants and their corresponding indices in both objects. 998 999 Args: 1000 snpobj (SNPObject): 1001 The reference SNPObject to compare against. 1002 index_by (str, default='pos'): 1003 Criteria for matching variants. Options: 1004 - `'pos'`: Matches by chromosome and position (`variants_chrom`, `variants_pos`), e.g., 'chr1-12345'. 1005 - `'id'`: Matches by variant ID alone (`variants_id`), e.g., 'rs123'. 1006 - `'pos+id'`: Matches by chromosome, position, and ID (`variants_chrom`, `variants_pos`, `variants_id`), e.g., 'chr1-12345-rs123'. 1007 Default is 'pos'. 1008 1009 Returns: 1010 Tuple containing: 1011 - **list of str:** A list of common variant identifiers (as strings). 1012 - **array:** An array of indices in `self` where common variants are located. 1013 - **array:** An array of indices in `snpobj` where common variants are located. 1014 """ 1015 # Create unique identifiers for each variant in both SNPObjects based on the specified criterion 1016 if index_by == 'pos': 1017 query_identifiers = [f"{chrom}-{pos}" for chrom, pos in zip(self['variants_chrom'], self['variants_pos'])] 1018 reference_identifiers = [f"{chrom}-{pos}" for chrom, pos in zip(snpobj['variants_chrom'], snpobj['variants_pos'])] 1019 elif index_by == 'id': 1020 query_identifiers = self['variants_id'].tolist() 1021 reference_identifiers = snpobj['variants_id'].tolist() 1022 elif index_by == 'pos+id': 1023 query_identifiers = [ 1024 f"{chrom}-{pos}-{ids}" for chrom, pos, ids in zip(self['variants_chrom'], self['variants_pos'], self['variants_id']) 1025 ] 1026 reference_identifiers = [ 1027 f"{chrom}-{pos}-{ids}" for chrom, pos, ids in zip(snpobj['variants_chrom'], snpobj['variants_pos'], snpobj['variants_id']) 1028 ] 1029 else: 1030 raise ValueError("`index_by` must be one of 'pos', 'id', or 'pos+id'.") 1031 1032 # Convert to sets for intersection 1033 common_ids = set(query_identifiers).intersection(reference_identifiers) 1034 1035 # Collect indices for common identifiers 1036 query_idx = [i for i, id in enumerate(query_identifiers) if id in common_ids] 1037 reference_idx = [i for i, id in enumerate(reference_identifiers) if id in common_ids] 1038 1039 return list(common_ids), np.array(query_idx), np.array(reference_idx) 1040 1041 def get_common_markers_intersection( 1042 self, 1043 snpobj: 'SNPObject' 1044 ) -> Tuple[List[str], np.ndarray, np.ndarray]: 1045 """ 1046 Identify common markers between between `self` and the `snpobj` instance. Common markers are identified 1047 based on matching chromosome (`variants_chrom`), position (`variants_pos`), reference (`variants_ref`), 1048 and alternate (`variants_alt`) alleles. 1049 1050 This method returns the identifiers of common markers and their corresponding indices in both objects. 1051 1052 Args: 1053 snpobj (SNPObject): 1054 The reference SNPObject to compare against. 1055 1056 Returns: 1057 Tuple containing: 1058 - **list of str:** A list of common variant identifiers (as strings). 1059 - **array:** An array of indices in `self` where common variants are located. 1060 - **array:** An array of indices in `snpobj` where common variants are located. 1061 """ 1062 # Generate unique identifiers based on chrom, pos, ref, and alt alleles 1063 query_identifiers = [ 1064 f"{chrom}-{pos}-{ref}-{alt}" for chrom, pos, ref, alt in 1065 zip(self['variants_chrom'], self['variants_pos'], self['variants_ref'], self['variants_alt']) 1066 ] 1067 reference_identifiers = [ 1068 f"{chrom}-{pos}-{ref}-{alt}" for chrom, pos, ref, alt in 1069 zip(snpobj['variants_chrom'], snpobj['variants_pos'], snpobj['variants_ref'], snpobj['variants_alt']) 1070 ] 1071 1072 # Convert to sets for intersection 1073 common_ids = set(query_identifiers).intersection(reference_identifiers) 1074 1075 # Collect indices for common identifiers in both SNPObjects 1076 query_idx = [i for i, id in enumerate(query_identifiers) if id in common_ids] 1077 reference_idx = [i for i, id in enumerate(reference_identifiers) if id in common_ids] 1078 1079 return list(common_ids), np.array(query_idx), np.array(reference_idx) 1080 1081 def subset_to_common_variants( 1082 self, 1083 snpobj: 'SNPObject', 1084 index_by: str = 'pos', 1085 common_variants_intersection: Optional[Tuple[np.ndarray, np.ndarray]] = None, 1086 inplace: bool = False 1087 ) -> Optional['SNPObject']: 1088 """ 1089 Subset `self` to include only the common variants with a reference `snpobj` based on 1090 the specified `index_by` criterion, which may match based on chromosome and position 1091 (`variants_chrom`, `variants_pos`), ID (`variants_id`), or both. 1092 1093 Args: 1094 snpobj (SNPObject): 1095 The reference SNPObject to compare against. 1096 index_by (str, default='pos'): 1097 Criteria for matching variants. Options: 1098 - `'pos'`: Matches by chromosome and position (`variants_chrom`, `variants_pos`), e.g., 'chr1-12345'. 1099 - `'id'`: Matches by variant ID alone (`variants_id`), e.g., 'rs123'. 1100 - `'pos+id'`: Matches by chromosome, position, and ID (`variants_chrom`, `variants_pos`, `variants_id`), e.g., 'chr1-12345-rs123'. 1101 Default is 'pos'. 1102 common_variants_intersection (Tuple[np.ndarray, np.ndarray], optional): 1103 Precomputed indices of common variants between `self` and `snpobj`. If None, intersection is 1104 computed within the function. 1105 inplace (bool, default=False): 1106 If True, modifies `self` in place. If False, returns a new `SNPObject` with the common variants 1107 subsetted. Default is False. 1108 1109 Returns: 1110 **Optional[SNPObject]:** 1111 A new `SNPObject` with the common variants subsetted if `inplace=False`. 1112 If `inplace=True`, modifies `self` in place and returns None. 1113 """ 1114 # Get indices of common variants if not provided 1115 if common_variants_intersection is None: 1116 _, query_idx, _ = self.get_common_variants_intersection(snpobj, index_by=index_by) 1117 else: 1118 query_idx, _ = common_variants_intersection 1119 1120 # Use filter_variants method with the identified indices, applying `inplace` as specified 1121 return self.filter_variants(indexes=query_idx, include=True, inplace=inplace) 1122 1123 def subset_to_common_markers( 1124 self, 1125 snpobj: 'SNPObject', 1126 common_markers_intersection: Optional[Tuple[np.ndarray, np.ndarray]] = None, 1127 inplace: bool = False 1128 ) -> Optional['SNPObject']: 1129 """ 1130 Subset `self` to include only the common markers with a reference `snpobj`. Common markers are identified 1131 based on matching chromosome (`variants_chrom`), position (`variants_pos`), reference (`variants_ref`), 1132 and alternate (`variants_alt`) alleles. 1133 1134 Args: 1135 snpobj (SNPObject): 1136 The reference SNPObject to compare against. 1137 common_markers_intersection (tuple of arrays, optional): 1138 Precomputed indices of common markers between `self` and `snpobj`. If None, intersection is 1139 computed within the function. 1140 inplace (bool, default=False): 1141 If True, modifies `self` in place. If False, returns a new `SNPObject` with the common markers 1142 subsetted. Default is False. 1143 1144 Returns: 1145 **Optional[SNPObject]:** 1146 A new `SNPObject` with the common markers subsetted if `inplace=False`. 1147 If `inplace=True`, modifies `self` in place and returns None. 1148 """ 1149 # Get indices of common markers if not provided 1150 if common_markers_intersection is None: 1151 _, query_idx, _ = self.get_common_markers_intersection(snpobj) 1152 else: 1153 query_idx, _ = common_markers_intersection 1154 1155 # Use filter_variants method with the identified indices, applying `inplace` as specified 1156 return self.filter_variants(indexes=query_idx, include=True, inplace=inplace) 1157 1158 def merge( 1159 self, 1160 snpobj: 'SNPObject', 1161 force_samples: bool = False, 1162 prefix: str = '2', 1163 inplace: bool = False 1164 ) -> Optional['SNPObject']: 1165 """ 1166 Merge `self` with `snpobj` along the sample axis. 1167 1168 This method expects both SNPObjects to contain the same set of SNPs in the same order, 1169 then combines their genotype (`calldata_gt`) and LAI (`calldata_lai`) arrays by 1170 concatenating the sample dimension. Samples from `snpobj` are appended to those in `self`. 1171 1172 Args: 1173 snpobj (SNPObject): 1174 The SNPObject to merge samples with. 1175 force_samples (bool, default=False): 1176 If True, duplicate sample names are resolved by prepending the `prefix` to duplicate sample names in 1177 `snpobj`. Otherwise, merging fails when duplicate sample names are found. Default is False. 1178 prefix (str, default='2'): 1179 A string prepended to duplicate sample names in `snpobj` when `force_samples=True`. 1180 Duplicates are renamed from `<sample_name>` to `<prefix>:<sample_name>`. For instance, 1181 if `prefix='2'` and there is a conflict with a sample called "sample_1", it becomes "2:sample_1". 1182 inplace (bool, default=False): 1183 If True, modifies `self` in place. If False, returns a new `SNPObject` with the merged samples. 1184 Default is False. 1185 1186 Returns: 1187 **Optional[SNPObject]**: A new SNPObject containing the merged sample data. 1188 """ 1189 # Merge calldata_gt if present and compatible 1190 if self.calldata_gt is not None and snpobj.calldata_gt is not None: 1191 if self.calldata_gt.shape[0] != snpobj.calldata_gt.shape[0]: 1192 raise ValueError( 1193 f"Cannot merge SNPObjects: Mismatch in the number of SNPs in `calldata_gt`.\n" 1194 f"`self.calldata_gt` has {self.calldata_gt.shape[0]} SNPs, " 1195 f"while `snpobj.calldata_gt` has {snpobj.calldata_gt.shape[0]} SNPs." 1196 ) 1197 if self.are_strands_summed and not snpobj.are_strands_summed: 1198 raise ValueError( 1199 "Cannot merge SNPObjects: `self` has summed strands, but `snpobj` does not.\n" 1200 "Ensure both objects have the same genotype summation state before merging." 1201 ) 1202 if not self.are_strands_summed and snpobj.are_strands_summed: 1203 raise ValueError( 1204 "Cannot merge SNPObjects: `snpobj` has summed strands, but `self` does not.\n" 1205 "Ensure both objects have the same genotype summation state before merging." 1206 ) 1207 calldata_gt = np.concatenate([self.calldata_gt, snpobj.calldata_gt], axis=1) 1208 else: 1209 calldata_gt = None 1210 1211 # Merge samples if present and compatible, handling duplicates if `force_samples=True` 1212 if self.samples is not None and snpobj.samples is not None: 1213 overlapping_samples = set(self.samples).intersection(set(snpobj.samples)) 1214 if overlapping_samples: 1215 if not force_samples: 1216 raise ValueError( 1217 f"Cannot merge SNPObjects: Found overlapping sample names {overlapping_samples}.\n" 1218 "Samples must be strictly non-overlapping. To allow merging with renaming, set `force_samples=True`." 1219 ) 1220 else: 1221 # Rename duplicate samples by prepending the file index 1222 renamed_samples = [f"{prefix}:{sample}" if sample in overlapping_samples else sample for sample in snpobj.samples] 1223 samples = np.concatenate([self.samples, renamed_samples], axis=0) 1224 else: 1225 samples = np.concatenate([self.samples, snpobj.samples], axis=0) 1226 else: 1227 samples = None 1228 1229 # Merge LAI data if present and compatible 1230 if self.calldata_lai is not None and snpobj.calldata_lai is not None: 1231 if self.calldata_lai.ndim != snpobj.calldata_lai.ndim: 1232 raise ValueError( 1233 f"Cannot merge SNPObjects: Mismatch in `calldata_lai` dimensions.\n" 1234 f"`self.calldata_lai` has {self.calldata_lai.ndim} dimensions, " 1235 f"while `snpobj.calldata_lai` has {snpobj.calldata_lai.ndim} dimensions." 1236 ) 1237 if self.calldata_lai.shape[0] != snpobj.calldata_lai.shape[0]: 1238 raise ValueError( 1239 f"Cannot merge SNPObjects: Mismatch in the number of SNPs in `calldata_lai`.\n" 1240 f"`self.calldata_lai` has {self.calldata_lai.shape[0]} SNPs, " 1241 f"while `snpobj.calldata_lai` has {snpobj.calldata_lai.shape[0]} SNPs." 1242 ) 1243 calldata_lai = np.concatenate([self.calldata_lai, snpobj.calldata_lai], axis=1) 1244 else: 1245 calldata_lai = None 1246 1247 if inplace: 1248 self.calldata_gt = calldata_gt 1249 self.calldata_lai = calldata_lai 1250 self.samples = samples 1251 return self 1252 1253 # Create and return a new SNPObject containing the merged samples 1254 return SNPObject( 1255 calldata_gt=calldata_gt, 1256 samples=samples, 1257 variants_ref=self.variants_ref, 1258 variants_alt=self.variants_alt, 1259 variants_chrom=self.variants_chrom, 1260 variants_filter_pass=self.variants_filter_pass, 1261 variants_id=self.variants_id, 1262 variants_pos=self.variants_pos, 1263 variants_qual=self.variants_qual, 1264 calldata_lai=calldata_lai, 1265 ancestry_map=self.ancestry_map 1266 ) 1267 1268 def concat( 1269 self, 1270 snpobj: 'SNPObject', 1271 inplace: bool = False 1272 ) -> Optional['SNPObject']: 1273 """ 1274 Concatenate self with snpobj along the SNP axis. 1275 1276 This method expects both SNPObjects to contain the same set of samples in the same order, 1277 and that the chromosome(s) in snpobj follow (i.e. have higher numeric identifiers than) 1278 those in self. 1279 1280 Args: 1281 snpobj (SNPObject): 1282 The SNPObject to concatenate SNPs with. 1283 inplace (bool, default=False): 1284 If True, modifies `self` in place. If False, returns a new `SNPObject` with the concatenated SNPs. 1285 Default is False. 1286 1287 Returns: 1288 **Optional[SNPObject]**: A new SNPObject containing the concatenated SNP data. 1289 """ 1290 # Merge calldata_gt if present and compatible 1291 if self.calldata_gt is not None and snpobj.calldata_gt is not None: 1292 if self.calldata_gt.shape[1] != snpobj.calldata_gt.shape[1]: 1293 raise ValueError( 1294 f"Cannot merge SNPObjects: Mismatch in the number of samples in `calldata_gt`.\n" 1295 f"`self.calldata_gt` has {self.calldata_gt.shape[1]} samples, " 1296 f"while `snpobj.calldata_gt` has {snpobj.calldata_gt.shape[1]} samples." 1297 ) 1298 if self.are_strands_summed and not snpobj.are_strands_summed: 1299 raise ValueError( 1300 "Cannot merge SNPObjects: `self` has summed strands, but `snpobj` does not.\n" 1301 "Ensure both objects have the same genotype summation state before merging." 1302 ) 1303 if not self.are_strands_summed and snpobj.are_strands_summed: 1304 raise ValueError( 1305 "Cannot merge SNPObjects: `snpobj` has summed strands, but `self` does not.\n" 1306 "Ensure both objects have the same genotype summation state before merging." 1307 ) 1308 calldata_gt = np.concatenate([self.calldata_gt, snpobj.calldata_gt], axis=0) 1309 else: 1310 calldata_gt = None 1311 1312 # Merge SNP-related attributes if present 1313 attributes = [ 1314 'variants_ref', 'variants_alt', 'variants_chrom', 'variants_filter_pass', 'variants_id', 'variants_pos', 'variants_qual' 1315 ] 1316 merged_attrs = {} 1317 for attr in attributes: 1318 self_attr = getattr(self, attr, None) 1319 obj_attr = getattr(snpobj, attr, None) 1320 1321 # Concatenate if both present 1322 if self_attr is not None and obj_attr is not None: 1323 merged_attrs[attr] = np.concatenate([self_attr, obj_attr], axis=0) 1324 else: 1325 # If either is None, store None 1326 merged_attrs[attr] = None 1327 1328 # Merge LAI data if present and compatible 1329 if self.calldata_lai is not None and snpobj.calldata_lai is not None: 1330 if self.calldata_lai.ndim != snpobj.calldata_lai.ndim: 1331 raise ValueError( 1332 f"Cannot merge SNPObjects: Mismatch in `calldata_lai` dimensions.\n" 1333 f"`self.calldata_lai` has {self.calldata_lai.ndim} dimensions, " 1334 f"while `snpobj.calldata_lai` has {snpobj.calldata_lai.ndim} dimensions." 1335 ) 1336 if self.calldata_lai.shape[1] != snpobj.calldata_lai.shape[1]: 1337 raise ValueError( 1338 f"Cannot merge SNPObjects: Mismatch in the number of samples in `calldata_lai`.\n" 1339 f"`self.calldata_lai` has {self.calldata_lai.shape[1]} samples, " 1340 f"while `snpobj.calldata_lai` has {snpobj.calldata_lai.shape[1]} samples." 1341 ) 1342 calldata_lai = np.concatenate([self.calldata_lai, snpobj.calldata_lai], axis=0) 1343 else: 1344 calldata_lai = None 1345 1346 if inplace: 1347 self.calldata_gt = calldata_gt 1348 self.calldata_lai = calldata_lai 1349 for attr in attributes: 1350 self[attr] = merged_attrs[attr] 1351 return self 1352 1353 # Create and return a new SNPObject containing the concatenated SNPs 1354 return SNPObject( 1355 calldata_gt=calldata_gt, 1356 calldata_lai=calldata_lai, 1357 samples=self.samples, 1358 variants_ref=merged_attrs['variants_ref'], 1359 variants_alt=merged_attrs['variants_alt'], 1360 variants_chrom=merged_attrs['variants_chrom'], 1361 variants_id=merged_attrs['variants_id'], 1362 variants_pos=merged_attrs['variants_pos'], 1363 variants_qual=merged_attrs['variants_qual'], 1364 variants_filter_pass=merged_attrs['variants_filter_pass'], 1365 ancestry_map=self.ancestry_map 1366 ) 1367 1368 def remove_strand_ambiguous_variants(self, inplace: bool = False) -> Optional['SNPObject']: 1369 """ 1370 A strand-ambiguous variant has reference (`variants_ref`) and alternate (`variants_alt`) alleles 1371 in the pairs A/T, T/A, C/G, or G/C, where both alleles are complementary and thus indistinguishable 1372 in terms of strand orientation. 1373 1374 Args: 1375 inplace (bool, default=False): 1376 If True, modifies `self` in place. If False, returns a new `SNPObject` with the 1377 strand-ambiguous variants removed. Default is False. 1378 1379 Returns: 1380 **Optional[SNPObject]:** A new `SNPObject` with non-ambiguous variants only if `inplace=False`. 1381 If `inplace=True`, modifies `self` in place and returns None. 1382 """ 1383 # Identify strand-ambiguous SNPs using vectorized comparisons 1384 is_AT = (self['variants_ref'] == 'A') & (self['variants_alt'] == 'T') 1385 is_TA = (self['variants_ref'] == 'T') & (self['variants_alt'] == 'A') 1386 is_CG = (self['variants_ref'] == 'C') & (self['variants_alt'] == 'G') 1387 is_GC = (self['variants_ref'] == 'G') & (self['variants_alt'] == 'C') 1388 1389 # Create a combined mask for all ambiguous variants 1390 ambiguous_mask = is_AT | is_TA | is_CG | is_GC 1391 non_ambiguous_idx = np.where(~ambiguous_mask)[0] 1392 1393 # Count each type of ambiguity using numpy's sum on boolean arrays 1394 A_T_count = np.sum(is_AT) 1395 T_A_count = np.sum(is_TA) 1396 C_G_count = np.sum(is_CG) 1397 G_C_count = np.sum(is_GC) 1398 1399 # Log the counts of each type of strand-ambiguous variants 1400 total_ambiguous = A_T_count + T_A_count + C_G_count + G_C_count 1401 log.info(f'{A_T_count} ambiguities of A-T type.') 1402 log.info(f'{T_A_count} ambiguities of T-A type.') 1403 log.info(f'{C_G_count} ambiguities of C-G type.') 1404 log.info(f'{G_C_count} ambiguities of G-C type.') 1405 1406 # Filter out ambiguous variants and keep non-ambiguous ones 1407 log.debug(f'Removing {total_ambiguous} strand-ambiguous variants...') 1408 return self.filter_variants(indexes=non_ambiguous_idx, include=True, inplace=inplace) 1409 1410 def correct_flipped_variants( 1411 self, 1412 snpobj: 'SNPObject', 1413 check_complement: bool = True, 1414 index_by: str = 'pos', 1415 common_variants_intersection: Optional[Tuple[np.ndarray, np.ndarray]] = None, 1416 log_stats: bool = True, 1417 inplace: bool = False 1418 ) -> Optional['SNPObject']: 1419 """ 1420 Correct flipped variants between between `self` and a reference `snpobj`, where reference (`variants_ref`) 1421 and alternate (`variants_alt`) alleles are swapped. 1422 1423 **Flip Detection Based on `check_complement`:** 1424 1425 - If `check_complement=False`, only direct allele swaps are considered: 1426 1. **Direct Swap:** `self.variants_ref == snpobj.variants_alt` and `self.variants_alt == snpobj.variants_ref`. 1427 1428 - If `check_complement=True`, both direct and complementary swaps are considered, with four possible cases: 1429 1. **Direct Swap:** `self.variants_ref == snpobj.variants_alt` and `self.variants_alt == snpobj.variants_ref`. 1430 2. **Complement Swap of Ref:** `complement(self.variants_ref) == snpobj.variants_alt` and `self.variants_alt == snpobj.variants_ref`. 1431 3. **Complement Swap of Alt:** `self.variants_ref == snpobj.variants_alt` and `complement(self.variants_alt) == snpobj.variants_ref`. 1432 4. **Complement Swap of both Ref and Alt:** `complement(self.variants_ref) == snpobj.variants_alt` and `complement(self.variants_alt) == snpobj.variants_ref`. 1433 1434 **Note:** Variants where `self.variants_ref == self.variants_alt` are ignored as they are ambiguous. 1435 1436 **Correction Process:** 1437 - Swaps `variants_ref` and `variants_alt` alleles in `self` to align with `snpobj`. 1438 - Flips `calldata_gt` values (0 becomes 1, and 1 becomes 0) to match the updated allele configuration. 1439 1440 Args: 1441 snpobj (SNPObject): 1442 The reference SNPObject to compare against. 1443 check_complement (bool, default=True): 1444 If True, also checks for complementary base pairs (A/T, T/A, C/G, and G/C) when identifying swapped variants. 1445 Default is True. 1446 index_by (str, default='pos'): 1447 Criteria for matching variants. Options: 1448 - `'pos'`: Matches by chromosome and position (`variants_chrom`, `variants_pos`), e.g., 'chr1-12345'. 1449 - `'id'`: Matches by variant ID alone (`variants_id`), e.g., 'rs123'. 1450 - `'pos+id'`: Matches by chromosome, position, and ID (`variants_chrom`, `variants_pos`, `variants_id`), e.g., 'chr1-12345-rs123'. 1451 Default is 'pos'. 1452 common_variants_intersection (tuple of arrays, optional): 1453 Precomputed indices of common variants between `self` and `snpobj`. If None, intersection is 1454 computed within the function. 1455 log_stats (bool, default=True): 1456 If True, logs statistical information about matching and ambiguous alleles. Default is True. 1457 inplace (bool, default=False): 1458 If True, modifies `self` in place. If False, returns a new `SNPObject` with corrected 1459 flips. Default is False. 1460 1461 Returns: 1462 **Optional[SNPObject]**: 1463 A new `SNPObject` with corrected flips if `inplace=False`. 1464 If `inplace=True`, modifies `self` in place and returns None. 1465 """ 1466 # Define complement mappings for nucleotides 1467 complement_map = {'A': 'T', 'T': 'A', 'C': 'G', 'G': 'C'} 1468 1469 # Helper function to get the complement of a base 1470 def get_complement(base: str) -> str: 1471 return complement_map.get(base, base) 1472 1473 # Get common variant indices if not provided 1474 if common_variants_intersection != None: 1475 query_idx, reference_idx = common_variants_intersection 1476 else: 1477 _, query_idx, reference_idx = self.get_common_variants_intersection(snpobj, index_by=index_by) 1478 1479 # Log statistics on matching alleles if enabled 1480 if log_stats: 1481 matching_ref = np.sum(self['variants_ref'][query_idx] == snpobj['variants_ref'][reference_idx]) 1482 matching_alt = np.sum(self['variants_alt'][query_idx] == snpobj['variants_alt'][reference_idx]) 1483 ambiguous = np.sum(self['variants_ref'][query_idx] == self['variants_alt'][query_idx]) 1484 log.info(f"Matching reference alleles (ref=ref'): {matching_ref}, Matching alternate alleles (alt=alt'): {matching_alt}.") 1485 log.info(f"Number of ambiguous alleles (ref=alt): {ambiguous}.") 1486 1487 # Identify indices where `ref` and `alt` alleles are swapped 1488 if not check_complement: 1489 # Simple exact match for swapped alleles 1490 swapped_ref = (self['variants_ref'][query_idx] == snpobj['variants_alt'][reference_idx]) 1491 swapped_alt = (self['variants_alt'][query_idx] == snpobj['variants_ref'][reference_idx]) 1492 else: 1493 # Check for swapped or complementary-swapped alleles 1494 swapped_ref = ( 1495 (self['variants_ref'][query_idx] == snpobj['variants_alt'][reference_idx]) | 1496 (np.vectorize(get_complement)(self['variants_ref'][query_idx]) == snpobj['variants_alt'][reference_idx]) 1497 ) 1498 swapped_alt = ( 1499 (self['variants_alt'][query_idx] == snpobj['variants_ref'][reference_idx]) | 1500 (np.vectorize(get_complement)(self['variants_alt'][query_idx]) == snpobj['variants_ref'][reference_idx]) 1501 ) 1502 1503 # Filter out ambiguous variants where `ref` and `alt` alleles match (ref=alt) 1504 not_ambiguous = (self['variants_ref'][query_idx] != self['variants_alt'][query_idx]) 1505 1506 # Indices in `self` of flipped variants 1507 flip_idx_query = query_idx[swapped_ref & swapped_alt & not_ambiguous] 1508 1509 # Correct the identified variant flips 1510 if len(flip_idx_query) > 0: 1511 log.info(f'Correcting {len(flip_idx_query)} variant flips...') 1512 1513 temp_alts = self['variants_alt'][flip_idx_query] 1514 temp_refs = self['variants_ref'][flip_idx_query] 1515 1516 # Correct the variant flips based on whether the operation is in-place or not 1517 if inplace: 1518 self['variants_alt'][flip_idx_query] = temp_refs 1519 self['variants_ref'][flip_idx_query] = temp_alts 1520 self['calldata_gt'][flip_idx_query] = 1 - self['calldata_gt'][flip_idx_query] 1521 return None 1522 else: 1523 snpobj = self.copy() 1524 snpobj['variants_alt'][flip_idx_query] = temp_refs 1525 snpobj['variants_ref'][flip_idx_query] = temp_alts 1526 snpobj['calldata_gt'][flip_idx_query] = 1 - snpobj['calldata_gt'][flip_idx_query] 1527 return snpobj 1528 else: 1529 log.info('No variant flips found to correct.') 1530 return self if not inplace else None 1531 1532 def remove_mismatching_variants( 1533 self, 1534 snpobj: 'SNPObject', 1535 index_by: str = 'pos', 1536 common_variants_intersection: Optional[Tuple[np.ndarray, np.ndarray]] = None, 1537 inplace: bool = False 1538 ) -> Optional['SNPObject']: 1539 """ 1540 Remove variants from `self`, where reference (`variants_ref`) and/or alternate (`variants_alt`) alleles 1541 do not match with a reference `snpobj`. 1542 1543 Args: 1544 snpobj (SNPObject): 1545 The reference SNPObject to compare against. 1546 index_by (str, default='pos'): 1547 Criteria for matching variants. Options: 1548 - `'pos'`: Matches by chromosome and position (`variants_chrom`, `variants_pos`), e.g., 'chr1-12345'. 1549 - `'id'`: Matches by variant ID alone (`variants_id`), e.g., 'rs123'. 1550 - `'pos+id'`: Matches by chromosome, position, and ID (`variants_chrom`, `variants_pos`, `variants_id`), e.g., 'chr1-12345-rs123'. 1551 Default is 'pos'. 1552 common_variants_intersection (tuple of arrays, optional): 1553 Precomputed indices of common variants between `self` and the reference `snpobj`. 1554 If None, the intersection is computed within the function. 1555 inplace (bool, default=False): 1556 If True, modifies `self` in place. If False, returns a new `SNPObject` without 1557 mismatching variants. Default is False. 1558 1559 Returns: 1560 **Optional[SNPObject]:** 1561 A new `SNPObject` without mismatching variants if `inplace=False`. 1562 If `inplace=True`, modifies `self` in place and returns None. 1563 """ 1564 # Get common variant indices if not provided 1565 if common_variants_intersection is not None: 1566 query_idx, reference_idx = common_variants_intersection 1567 else: 1568 _, query_idx, reference_idx = self.get_common_variants_intersection(snpobj, index_by=index_by) 1569 1570 # Vectorized comparison of `ref` and `alt` alleles 1571 ref_mismatch = self['variants_ref'][query_idx] != snpobj['variants_ref'][reference_idx] 1572 alt_mismatch = self['variants_alt'][query_idx] != snpobj['variants_alt'][reference_idx] 1573 mismatch_mask = ref_mismatch | alt_mismatch 1574 1575 # Identify indices in `self` of mismatching variants 1576 mismatch_idx = query_idx[mismatch_mask] 1577 1578 # Compute total number of variant mismatches 1579 total_mismatches = np.sum(mismatch_mask) 1580 1581 # Filter out mismatching variants 1582 log.debug(f'Removing {total_mismatches} mismatching variants...') 1583 return self.filter_variants(indexes=mismatch_idx, include=True, inplace=inplace) 1584 1585 def shuffle_variants(self, inplace: bool = False) -> Optional['SNPObject']: 1586 """ 1587 Randomly shuffle the positions of variants in the SNPObject, ensuring that all associated 1588 data (e.g., `calldata_gt` and variant-specific attributes) remain aligned. 1589 1590 Args: 1591 inplace (bool, default=False): 1592 If True, modifies `self` in place. If False, returns a new `SNPObject` with 1593 shuffled variants. Default is False. 1594 1595 Returns: 1596 **Optional[SNPObject]:** 1597 A new `SNPObject` without shuffled variant positions if `inplace=False`. 1598 If `inplace=True`, modifies `self` in place and returns None. 1599 """ 1600 # Generate a random permutation index for shuffling variant positions 1601 shuffle_index = np.random.permutation(self.n_snps) 1602 1603 # Apply shuffling to all relevant attributes using the class's dictionary-like interface 1604 if inplace: 1605 for key in self.keys(): 1606 if self[key] is not None: 1607 if key == 'calldata_gt': 1608 # `calldata_gt`` has a different shape, so it's shuffled along axis 0 1609 self[key] = self[key][shuffle_index, ...] 1610 elif 'variant' in key: 1611 # snpobj attributes are 1D arrays 1612 self[key] = np.asarray(self[key])[shuffle_index] 1613 return None 1614 else: 1615 shuffled_snpobj = self.copy() 1616 for key in shuffled_snpobj.keys(): 1617 if shuffled_snpobj[key] is not None: 1618 if key == 'calldata_gt': 1619 shuffled_snpobj[key] = shuffled_snpobj[key][shuffle_index, ...] 1620 elif 'variant' in key: 1621 shuffled_snpobj[key] = np.asarray(shuffled_snpobj[key])[shuffle_index] 1622 return shuffled_snpobj 1623 1624 def set_empty_to_missing(self, inplace: bool = False) -> Optional['SNPObject']: 1625 """ 1626 Replace empty strings `''` with missing values `'.'` in attributes of `self`. 1627 1628 Args: 1629 inplace (bool, default=False): 1630 If True, modifies `self` in place. If False, returns a new `SNPObject` with empty 1631 strings `''` replaced by missing values `'.'`. Default is False. 1632 1633 Returns: 1634 **Optional[SNPObject]:** 1635 A new `SNPObject` with empty strings replaced if `inplace=False`. 1636 If `inplace=True`, modifies `self` in place and returns None. 1637 """ 1638 if inplace: 1639 if self.variants_alt is not None: 1640 self.variants_alt[self.variants_alt == ''] = '.' 1641 if self.variants_ref is not None: 1642 self.variants_ref[self.variants_ref == ''] = '.' 1643 if self.variants_qual is not None: 1644 self.variants_qual = self.variants_qual.astype(str) 1645 self.variants_qual[(self.variants_qual == '') | (self.variants_qual == 'nan')] = '.' 1646 if self.variants_chrom is not None: 1647 self.variants_chrom = self.variants_chrom.astype(str) 1648 self.variants_chrom[self.variants_chrom == ''] = '.' 1649 if self.variants_filter_pass is not None: 1650 self.variants_filter_pass[self.variants_filter_pass == ''] = '.' 1651 if self.variants_id is not None: 1652 self.variants_id[self.variants_id == ''] = '.' 1653 return self 1654 else: 1655 snpobj = self.copy() 1656 if snpobj.variants_alt is not None: 1657 snpobj.variants_alt[snpobj.variants_alt == ''] = '.' 1658 if snpobj.variants_ref is not None: 1659 snpobj.variants_ref[snpobj.variants_ref == ''] = '.' 1660 if snpobj.variants_qual is not None: 1661 snpobj.variants_qual = snpobj.variants_qual.astype(str) 1662 snpobj.variants_qual[(snpobj.variants_qual == '') | (snpobj.variants_qual == 'nan')] = '.' 1663 if snpobj.variants_chrom is not None: 1664 snpobj.variants_chrom[snpobj.variants_chrom == ''] = '.' 1665 if snpobj.variants_filter_pass is not None: 1666 snpobj.variants_filter_pass[snpobj.variants_filter_pass == ''] = '.' 1667 if snpobj.variants_id is not None: 1668 snpobj.variants_id[snpobj.variants_id == ''] = '.' 1669 return snpobj 1670 1671 def convert_to_window_level( 1672 self, 1673 window_size: Optional[int] = None, 1674 physical_pos: Optional[np.ndarray] = None, 1675 chromosomes: Optional[np.ndarray] = None, 1676 window_sizes: Optional[np.ndarray] = None, 1677 laiobj: Optional['LocalAncestryObject'] = None 1678 ) -> 'LocalAncestryObject': 1679 """ 1680 Aggregate the `calldata_lai` attribute into genomic windows within a 1681 `snputils.ancestry.genobj.LocalAncestryObject`. 1682 1683 **Options for defining windows (in order of precedence):** 1684 1685 1. **Fixed window size**: 1686 - Use `window_size` to specify how many SNPs go into each window. The last window on each 1687 chromosome may be larger if SNPs are not evenly divisible by the size. 1688 1689 2. **Custom start and end positions**: 1690 - Provide `physical_pos` (2D array of shape (n_windows, 2)) as the [start, end] base-pair 1691 coordinates for each window. 1692 - If `chromosomes` is not provided and `self` has exactly one chromosome, all windows are 1693 assumed to belong to that chromosome. 1694 - If multiple chromosomes exist but `chromosomes` is missing, an error will be raised. 1695 - Optionally, provide `window_sizes` to store the SNP count per-window. 1696 1697 3. **Matching existing windows**: 1698 - Reuse window definitions (`physical_pos`, `chromosomes`, `window_sizes`) from an existing `laiobj`. 1699 1700 Args: 1701 window_size (int, optional): 1702 Number of SNPs in each window if defining fixed-size windows. If the total number of 1703 SNPs in a chromosome is not evenly divisible by the window size, the last window on that 1704 chromosome will include all remaining SNPs and therefore be larger than the specified size. 1705 physical_pos (array of shape (n_windows, 2), optional): 1706 A 2D array containing the start and end physical positions for each window. 1707 chromosomes (array of shape (n_windows,), optional): 1708 An array with chromosome numbers corresponding to each genomic window. 1709 window_sizes (array of shape (n_windows,), optional): 1710 An array specifying the number of SNPs in each genomic window. 1711 laiobj (LocalAncestryObject, optional): 1712 A reference `LocalAncestryObject` from which to copy existing window definitions. 1713 1714 Returns: 1715 **LocalAncestryObject:** 1716 A LocalAncestryObject containing window-level ancestry data. 1717 """ 1718 from snputils.ancestry.genobj.local import LocalAncestryObject 1719 1720 if window_size is None and physical_pos is None and laiobj is None: 1721 raise ValueError("One of `window_size`, `physical_pos`, or `laiobj` must be provided.") 1722 1723 # Fixed window size 1724 if window_size is not None: 1725 physical_pos = [] # Boundaries [start, end] of each window 1726 chromosomes = [] # Chromosome for each window 1727 window_sizes = [] # Number of SNPs for each window 1728 for chrom in self.unique_chrom: 1729 # Extract indices corresponding to this chromosome 1730 mask_chrom = (self.variants_chrom == chrom) 1731 # Subset to this chromosome 1732 pos_chrom = self.variants_pos[mask_chrom] 1733 # Number of SNPs for this chromosome 1734 n_snps_chrom = pos_chrom.size 1735 1736 # Initialize the start of the first window with the position of the first SNP 1737 current_start = self.variants_pos[0] 1738 1739 # Number of full windows with exactly `window_size` SNPs 1740 n_full_windows = n_snps_chrom // window_size 1741 1742 # Build all but the last window 1743 for i in range(n_full_windows-1): 1744 current_end = self.variants_pos[(i+1) * window_size - 1] 1745 physical_pos.append([current_start, current_end]) 1746 chromosomes.append(chrom) 1747 window_sizes.append(window_size) 1748 current_start = self.variants_pos[(i+1) * window_size] 1749 1750 # Build the last window 1751 current_end = self.variants_pos[-1] 1752 physical_pos.append([current_start, current_end]) 1753 chromosomes.append(chrom) 1754 window_sizes.append(n_snps_chrom - ((n_full_windows - 1) * window_size)) 1755 1756 physical_pos = np.array(physical_pos) 1757 chromosomes = np.array(chromosomes) 1758 window_sizes = np.array(window_sizes) 1759 1760 # Custom start and end positions 1761 elif physical_pos is not None: 1762 # Check if there is exactly one chromosome 1763 if chromosomes is None: 1764 unique_chrom = self.unique_chrom 1765 if len(unique_chrom) == 1: 1766 # We assume all windows belong to this single chromosome 1767 single_chrom = unique_chrom[0] 1768 chromosomes = np.array([single_chrom] * physical_pos.shape[0]) 1769 else: 1770 raise ValueError("Multiple chromosomes detected, but `chromosomes` was not provided.") 1771 1772 # Match existing windows to a reference laiobj 1773 elif laiobj is not None: 1774 physical_pos = laiobj.physical_pos 1775 chromosomes = laiobj.chromosomes 1776 window_sizes = laiobj.window_sizes 1777 1778 # Allocate an output LAI array 1779 n_windows = physical_pos.shape[0] 1780 n_samples = self.n_samples 1781 if self.calldata_lai.ndim == 3: 1782 lai = np.zeros((n_windows, n_samples, 2)) 1783 else: 1784 lai = np.zeros((n_windows, n_samples*2)) 1785 1786 # For each window, find the relevant SNPs and compute the mode of the ancestries 1787 for i, ((start, end), chrom) in enumerate(zip(physical_pos, chromosomes)): 1788 snps_mask = ( 1789 (self.variants_chrom == chrom) & 1790 (self.variants_pos >= start) & 1791 (self.variants_pos <= end) 1792 ) 1793 if np.any(snps_mask): 1794 lai_mask = self.calldata_lai[snps_mask, ...] 1795 mode_ancestries = mode(lai_mask, axis=0, nan_policy='omit').mode 1796 lai[i] = mode_ancestries 1797 else: 1798 lai[i] = np.nan 1799 1800 # Generate haplotype labels, e.g. "Sample1.0", "Sample1.1" 1801 haplotypes = [f"{sample}.{i}" for sample in self.samples for i in range(2)] 1802 1803 # If original data was (n_snps, n_samples, 2), flatten to (n_windows, n_samples*2) 1804 if self.calldata_lai.ndim == 3: 1805 lai = lai.reshape(n_windows, -1) 1806 1807 # Aggregate into a LocalAncestryObject 1808 return LocalAncestryObject( 1809 haplotypes=haplotypes, 1810 lai=lai, 1811 samples=self.samples, 1812 ancestry_map=self.ancestry_map, 1813 window_sizes=window_sizes, 1814 physical_pos=physical_pos, 1815 chromosomes=chromosomes 1816 ) 1817 1818 def save(self, file: Union[str, Path]) -> None: 1819 """ 1820 Save the data stored in `self` to a specified file. 1821 1822 The format of the saved file is determined by the file extension provided in the `file` 1823 argument. 1824 1825 **Supported formats:** 1826 1827 - `.bed`: Binary PED (Plink) format. 1828 - `.pgen`: Plink2 binary genotype format. 1829 - `.vcf`: Variant Call Format. 1830 - `.pkl`: Pickle format for saving `self` in serialized form. 1831 1832 Args: 1833 file (str or pathlib.Path): 1834 Path to the file where the data will be saved. The extension of the file determines the save format. 1835 Supported extensions: `.bed`, `.pgen`, `.vcf`, `.pkl`. 1836 """ 1837 ext = Path(file).suffix.lower() 1838 if ext == '.bed': 1839 self.save_bed(file) 1840 elif ext == '.pgen': 1841 self.save_pgen(file) 1842 elif ext == '.vcf': 1843 self.save_vcf(file) 1844 elif ext == '.pkl': 1845 self.save_pickle(file) 1846 else: 1847 raise ValueError(f"Unsupported file extension: {ext}") 1848 1849 def save_bed(self, file: Union[str, Path]) -> None: 1850 """ 1851 Save the data stored in `self` to a `.bed` file. 1852 1853 Args: 1854 file (str or pathlib.Path): 1855 Path to the file where the data will be saved. It should end with `.bed`. 1856 If the provided path does not have this extension, it will be appended. 1857 """ 1858 from snputils.snp.io.write.bed import BEDWriter 1859 writer = BEDWriter(snpobj=self, filename=file) 1860 writer.write() 1861 1862 def save_pgen(self, file: Union[str, Path]) -> None: 1863 """ 1864 Save the data stored in `self` to a `.pgen` file. 1865 1866 Args: 1867 file (str or pathlib.Path): 1868 Path to the file where the data will be saved. It should end with `.pgen`. 1869 If the provided path does not have this extension, it will be appended. 1870 """ 1871 from snputils.snp.io.write.pgen import PGENWriter 1872 writer = PGENWriter(snpobj=self, filename=file) 1873 writer.write() 1874 1875 def save_vcf(self, file: Union[str, Path]) -> None: 1876 """ 1877 Save the data stored in `self` to a `.vcf` file. 1878 1879 Args: 1880 file (str or pathlib.Path): 1881 Path to the file where the data will be saved. It should end with `.vcf`. 1882 If the provided path does not have this extension, it will be appended. 1883 """ 1884 from snputils.snp.io.write.vcf import VCFWriter 1885 writer = VCFWriter(snpobj=self, filename=file) 1886 writer.write() 1887 1888 def save_pickle(self, file: Union[str, Path]) -> None: 1889 """ 1890 Save `self` in serialized form to a `.pkl` file. 1891 1892 Args: 1893 file (str or pathlib.Path): 1894 Path to the file where the data will be saved. It should end with `.pkl`. 1895 If the provided path does not have this extension, it will be appended. 1896 """ 1897 import pickle 1898 with open(file, 'wb') as file: 1899 pickle.dump(self, file) 1900 1901 @staticmethod 1902 def _match_to_replace(val: Union[str, int, float], dictionary: Dict[Any, Any], regex: bool = True) -> Union[str, int, float]: 1903 """ 1904 Find a matching key in the provided dictionary for the given value `val` 1905 and replace it with the corresponding value. 1906 1907 Args: 1908 val (str, int, or float): 1909 The value to be matched and potentially replaced. 1910 dictionary (Dict): 1911 A dictionary containing keys and values for matching and replacement. 1912 The keys should match the data type of `val`. 1913 regex (bool): 1914 If True, interprets keys in `dictionary` as regular expressions. 1915 Default is True. 1916 1917 Returns: 1918 str, int, or float: 1919 The replacement value from `dictionary` if a match is found; otherwise, the original `val`. 1920 """ 1921 if regex: 1922 # Use regular expression matching to find replacements 1923 for key, value in dictionary.items(): 1924 if isinstance(key, str): 1925 match = re.match(key, val) 1926 if match: 1927 # Replace using the first matching regex pattern 1928 return re.sub(key, value, val) 1929 # Return the original value if no regex match is found 1930 return val 1931 else: 1932 # Return the value for `val` if present in `dictionary`; otherwise, return `val` 1933 return dictionary.get(val, val) 1934 1935 @staticmethod 1936 def _get_chromosome_number(chrom_string: str) -> Union[int, str]: 1937 """ 1938 Extracts the chromosome number from the given chromosome string. 1939 1940 Args: 1941 chrom_string (str): 1942 The chromosome identifier. 1943 1944 Returns: 1945 int or str: 1946 The numeric representation of the chromosome if detected. 1947 Returns 10001 for 'X' or 'chrX', 10002 for 'Y' or 'chrY', 1948 and the original `chrom_string` if unrecognized. 1949 """ 1950 if chrom_string.isdigit(): 1951 return int(chrom_string) 1952 else: 1953 chrom_num = re.search(r'\d+', chrom_string) 1954 if chrom_num: 1955 return int(chrom_num.group()) 1956 elif chrom_string.lower() in ['x', 'chrx']: 1957 return 10001 1958 elif chrom_string.lower() in ['y', 'chry']: 1959 return 10002 1960 else: 1961 log.warning(f"Chromosome nomenclature not standard. Chromosome: {chrom_string}") 1962 return chrom_string 1963 1964 def _sanity_check(self) -> None: 1965 """ 1966 Perform sanity checks to ensure LAI and ancestry map consistency. 1967 1968 This method checks that all unique ancestries in the LAI data are represented 1969 in the ancestry map if it is provided. 1970 """ 1971 if self.__calldata_lai is not None and self.__ancestry_map is not None: 1972 unique_ancestries = np.unique(self.__calldata_lai) 1973 missing_ancestries = [anc for anc in unique_ancestries if str(anc) not in self.__ancestry_map] 1974 if missing_ancestries: 1975 warnings.warn(f"Missing ancestries in ancestry_map: {missing_ancestries}")
A class for Single Nucleotide Polymorphism (SNP) data, with optional support for SNP-level Local Ancestry Information (LAI).
22 def __init__( 23 self, 24 calldata_gt: Optional[np.ndarray] = None, 25 samples: Optional[np.ndarray] = None, 26 variants_ref: Optional[np.ndarray] = None, 27 variants_alt: Optional[np.ndarray] = None, 28 variants_chrom: Optional[np.ndarray] = None, 29 variants_filter_pass: Optional[np.ndarray] = None, 30 variants_id: Optional[np.ndarray] = None, 31 variants_pos: Optional[np.ndarray] = None, 32 variants_qual: Optional[np.ndarray] = None, 33 calldata_lai: Optional[np.ndarray] = None, 34 ancestry_map: Optional[Dict[str, str]] = None 35 ) -> None: 36 """ 37 Args: 38 calldata_gt (array, optional): 39 An array containing genotype data for each sample. This array can be either 2D with shape 40 `(n_snps, n_samples)` if the paternal and maternal strands are summed, or 3D with shape 41 `(n_snps, n_samples, 2)` if the strands are kept separate. 42 samples (array of shape (n_sampels,), optional): 43 An array containing unique sample identifiers. 44 variants_ref (array of shape (n_snps,), optional): 45 An array containing the reference allele for each SNP. 46 variants_alt (array of shape (n_snps,), optional): 47 An array containing the alternate allele for each SNP. 48 variants_chrom (array of shape (n_snps,), optional): 49 An array containing the chromosome for each SNP. 50 variants_filter_pass (array of shape (n_snps,), optional): 51 An array indicating whether each SNP passed control checks. 52 variants_id (array of shape (n_snps,), optional): 53 An array containing unique identifiers (IDs) for each SNP. 54 variants_pos (array of shape (n_snps,), optional): 55 An array containing the chromosomal positions for each SNP. 56 variants_qual (array of shape (n_snps,), optional): 57 An array containing the Phred-scaled quality score for each SNP. 58 calldata_lai (array, optional): 59 An array containing the ancestry for each SNP. This array can be either 2D with shape 60 `(n_snps, n_samples*2)`, or 3D with shape (n_snps, n_samples, 2). 61 ancestry_map (dict of str to str, optional): 62 A dictionary mapping ancestry codes to region names. 63 """ 64 self.__calldata_gt = calldata_gt 65 self.__samples = samples 66 self.__variants_ref = variants_ref 67 self.__variants_alt = variants_alt 68 self.__variants_chrom = variants_chrom 69 self.__variants_filter_pass = variants_filter_pass 70 self.__variants_id = variants_id 71 self.__variants_pos = variants_pos 72 self.__variants_qual = variants_qual 73 self.__calldata_lai = calldata_lai 74 self.__ancestry_map = ancestry_map 75 76 self._sanity_check()
Arguments:
- calldata_gt (array, optional): An array containing genotype data for each sample. This array can be either 2D with shape
(n_snps, n_samples)if the paternal and maternal strands are summed, or 3D with shape(n_snps, n_samples, 2)if the strands are kept separate. - samples (array of shape (n_sampels,), optional): An array containing unique sample identifiers.
- variants_ref (array of shape (n_snps,), optional): An array containing the reference allele for each SNP.
- variants_alt (array of shape (n_snps,), optional): An array containing the alternate allele for each SNP.
- variants_chrom (array of shape (n_snps,), optional): An array containing the chromosome for each SNP.
- variants_filter_pass (array of shape (n_snps,), optional): An array indicating whether each SNP passed control checks.
- variants_id (array of shape (n_snps,), optional): An array containing unique identifiers (IDs) for each SNP.
- variants_pos (array of shape (n_snps,), optional): An array containing the chromosomal positions for each SNP.
- variants_qual (array of shape (n_snps,), optional): An array containing the Phred-scaled quality score for each SNP.
- calldata_lai (array, optional): An array containing the ancestry for each SNP. This array can be either 2D with shape
(n_snps, n_samples*2), or 3D with shape (n_snps, n_samples, 2). - ancestry_map (dict of str to str, optional): A dictionary mapping ancestry codes to region names.
98 @property 99 def calldata_gt(self) -> np.ndarray: 100 """ 101 Retrieve `calldata_gt`. 102 103 Returns: 104 **array:** 105 An array containing genotype data for each sample. This array can be either 2D with shape 106 `(n_snps, n_samples)` if the paternal and maternal strands are summed, or 3D with shape 107 `(n_snps, n_samples, 2)` if the strands are kept separate. 108 """ 109 return self.__calldata_gt
Retrieve calldata_gt.
Returns:
array: An array containing genotype data for each sample. This array can be either 2D with shape
(n_snps, n_samples)if the paternal and maternal strands are summed, or 3D with shape(n_snps, n_samples, 2)if the strands are kept separate.
118 @property 119 def samples(self) -> Optional[np.ndarray]: 120 """ 121 Retrieve `samples`. 122 123 Returns: 124 **array of shape (n_sampels,):** 125 An array containing unique sample identifiers. 126 """ 127 return self.__samples
Retrieve samples.
Returns:
array of shape (n_sampels,): An array containing unique sample identifiers.
136 @property 137 def variants_ref(self) -> Optional[np.ndarray]: 138 """ 139 Retrieve `variants_ref`. 140 141 Returns: 142 **array of shape (n_snps,):** An array containing the reference allele for each SNP. 143 """ 144 return self.__variants_ref
Retrieve variants_ref.
Returns:
array of shape (n_snps,): An array containing the reference allele for each SNP.
153 @property 154 def variants_alt(self) -> Optional[np.ndarray]: 155 """ 156 Retrieve `variants_alt`. 157 158 Returns: 159 **array of shape (n_snps,):** An array containing the alternate allele for each SNP. 160 """ 161 return self.__variants_alt
Retrieve variants_alt.
Returns:
array of shape (n_snps,): An array containing the alternate allele for each SNP.
170 @property 171 def variants_chrom(self) -> Optional[np.ndarray]: 172 """ 173 Retrieve `variants_chrom`. 174 175 Returns: 176 **array of shape (n_snps,):** An array containing the chromosome for each SNP. 177 """ 178 return self.__variants_chrom
Retrieve variants_chrom.
Returns:
array of shape (n_snps,): An array containing the chromosome for each SNP.
187 @property 188 def variants_filter_pass(self) -> Optional[np.ndarray]: 189 """ 190 Retrieve `variants_filter_pass`. 191 192 Returns: 193 **array of shape (n_snps,):** An array indicating whether each SNP passed control checks. 194 """ 195 return self.__variants_filter_pass
Retrieve variants_filter_pass.
Returns:
array of shape (n_snps,): An array indicating whether each SNP passed control checks.
204 @property 205 def variants_id(self) -> Optional[np.ndarray]: 206 """ 207 Retrieve `variants_id`. 208 209 Returns: 210 **array of shape (n_snps,):** An array containing unique identifiers (IDs) for each SNP. 211 """ 212 return self.__variants_id
Retrieve variants_id.
Returns:
array of shape (n_snps,): An array containing unique identifiers (IDs) for each SNP.
221 @property 222 def variants_pos(self) -> Optional[np.ndarray]: 223 """ 224 Retrieve `variants_pos`. 225 226 Returns: 227 **array of shape (n_snps,):** An array containing the chromosomal positions for each SNP. 228 """ 229 return self.__variants_pos
Retrieve variants_pos.
Returns:
array of shape (n_snps,): An array containing the chromosomal positions for each SNP.
238 @property 239 def variants_qual(self) -> Optional[np.ndarray]: 240 """ 241 Retrieve `variants_qual`. 242 243 Returns: 244 **array of shape (n_snps,):** An array containing the Phred-scaled quality score for each SNP. 245 """ 246 return self.__variants_qual
Retrieve variants_qual.
Returns:
array of shape (n_snps,): An array containing the Phred-scaled quality score for each SNP.
255 @property 256 def calldata_lai(self) -> Optional[np.ndarray]: 257 """ 258 Retrieve `calldata_lai`. 259 260 Returns: 261 **array:** 262 An array containing the ancestry for each SNP. This array can be either 2D with shape 263 `(n_snps, n_samples*2)`, or 3D with shape (n_snps, n_samples, 2). 264 """ 265 return self.__calldata_lai
Retrieve calldata_lai.
Returns:
array: An array containing the ancestry for each SNP. This array can be either 2D with shape
(n_snps, n_samples*2), or 3D with shape (n_snps, n_samples, 2).
274 @property 275 def ancestry_map(self) -> Optional[Dict[str, str]]: 276 """ 277 Retrieve `ancestry_map`. 278 279 Returns: 280 **dict of str to str:** A dictionary mapping ancestry codes to region names. 281 """ 282 return self.__ancestry_map
Retrieve ancestry_map.
Returns:
dict of str to str: A dictionary mapping ancestry codes to region names.
291 @property 292 def n_samples(self) -> int: 293 """ 294 Retrieve `n_samples`. 295 296 Returns: 297 **int:** The total number of samples. 298 """ 299 if self.__samples is not None: 300 return len(self.__samples) 301 elif self.__calldata_gt is not None: 302 return self.__calldata_gt.shape[1] 303 elif self.__calldata_lai is not None: 304 if self.__calldata_lai.ndim == 2: 305 return self.__calldata_lai.shape[1] // 2 306 elif self.__calldata_lai.ndim == 3: 307 return self.__calldata_lai.shape[1] 308 else: 309 raise ValueError("Unable to determine the total number of samples: no relevant data is available.")
311 @property 312 def n_snps(self) -> int: 313 """ 314 Retrieve `n_snps`. 315 316 Returns: 317 **int:** The total number of SNPs. 318 """ 319 # List of attributes that can indicate the number of SNPs 320 potential_attributes = [ 321 self.__calldata_gt, 322 self.__variants_ref, 323 self.__variants_alt, 324 self.__variants_chrom, 325 self.__variants_filter_pass, 326 self.__variants_id, 327 self.__variants_pos, 328 self.__variants_qual, 329 self.__calldata_lai 330 ] 331 332 # Check each attribute for its first dimension, which corresponds to `n_snps` 333 for attr in potential_attributes: 334 if attr is not None: 335 return attr.shape[0] 336 337 raise ValueError("Unable to determine the total number of SNPs: no relevant data is available.")
339 @property 340 def n_chrom(self) -> Optional[int]: 341 """ 342 Retrieve `n_chrom`. 343 344 Returns: 345 **int:** The total number of unique chromosomes in `variants_chrom`. 346 """ 347 if self.variants_chrom is None: 348 warnings.warn("Chromosome data `variants_chrom` is None.") 349 return None 350 351 return len(self.unique_chrom)
353 @property 354 def n_ancestries(self) -> int: 355 """ 356 Retrieve `n_ancestries`. 357 358 Returns: 359 **int:** The total number of unique ancestries. 360 """ 361 if self.__calldata_lai is not None: 362 return len(np.unique(self.__calldata_lai)) 363 else: 364 raise ValueError("Unable to determine the total number of ancestries: no relevant data is available.")
366 @property 367 def unique_chrom(self) -> Optional[np.ndarray]: 368 """ 369 Retrieve `unique_chrom`. 370 371 Returns: 372 **array:** The unique chromosome names in `variants_chrom`, preserving their order of appearance. 373 """ 374 if self.variants_chrom is None: 375 warnings.warn("Chromosome data `variants_chrom` is None.") 376 return None 377 378 # Identify unique chromosome names and their first indexes of occurrence 379 _, idx = np.unique(self.variants_chrom, return_index=True) 380 # Return chromosome names sorted by their first occurrence to maintain original order 381 return self.variants_chrom[np.sort(idx)]
Retrieve unique_chrom.
Returns:
array: The unique chromosome names in
variants_chrom, preserving their order of appearance.
383 @property 384 def are_strands_summed(self) -> bool: 385 """ 386 Retrieve `are_strands_summed`. 387 388 Returns: 389 **bool:** 390 True if the maternal and paternal strands have been summed together, which is indicated by 391 `calldata_gt` having shape `(n_samples, n_snps)`. False if the strands are stored separately, 392 indicated by `calldata_gt` having shape `(n_samples, n_snps, 2)`. 393 """ 394 if self.calldata_gt is None: 395 warnings.warn("Genotype data `calldata_gt` is None.") 396 return None 397 398 return self.calldata_gt.ndim == 2
Retrieve are_strands_summed.
Returns:
bool: True if the maternal and paternal strands have been summed together, which is indicated by
calldata_gthaving shape(n_samples, n_snps). False if the strands are stored separately, indicated bycalldata_gthaving shape(n_samples, n_snps, 2).
400 def copy(self) -> 'SNPObject': 401 """ 402 Create and return a copy of `self`. 403 404 Returns: 405 **SNPObject:** 406 A new instance of the current object. 407 """ 408 return copy.deepcopy(self)
Create and return a copy of self.
Returns:
SNPObject: A new instance of the current object.
410 def keys(self) -> List[str]: 411 """ 412 Retrieve a list of public attribute names for `self`. 413 414 Returns: 415 **list of str:** 416 A list of attribute names, with internal name-mangling removed, 417 for easier reference to public attributes in the instance. 418 """ 419 return [attr.replace('_SNPObject__', '') for attr in vars(self)]
Retrieve a list of public attribute names for self.
Returns:
list of str: A list of attribute names, with internal name-mangling removed, for easier reference to public attributes in the instance.
421 def sum_strands(self, inplace: bool = False) -> Optional['SNPObject']: 422 """ 423 Sum paternal and maternal strands. 424 425 Args: 426 inplace (bool, default=False): 427 If True, modifies `self` in place. If False, returns a new `SNPObject` with the variants 428 filtered. Default is False. 429 430 Returns: 431 **Optional[SNPObject]:** 432 A new `SNPObject` with summed strands if `inplace=False`. 433 If `inplace=True`, modifies `self` in place and returns None. 434 """ 435 if self.calldata_gt is None: 436 warnings.warn("Genotype data `calldata_gt` is None.") 437 return None if not inplace else self 438 439 if self.are_strands_summed: 440 warnings.warn("Genotype data `calldata_gt` is already summed.") 441 return self if inplace else self.copy() 442 443 if inplace: 444 self.calldata_gt = self.calldata_gt.sum(axis=2, dtype=np.int8) 445 return self 446 else: 447 snpobj = self.copy() 448 snpobj.calldata_gt = self.calldata_gt.sum(axis=2, dtype=np.int8) 449 return snpobj
Sum paternal and maternal strands.
Arguments:
- inplace (bool, default=False): If True, modifies
selfin place. If False, returns a newSNPObjectwith the variants filtered. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObjectwith summed strands ifinplace=False. Ifinplace=True, modifiesselfin place and returns None.
451 def filter_variants( 452 self, 453 chrom: Optional[Union[str, Sequence[str], np.ndarray, None]] = None, 454 pos: Optional[Union[int, Sequence[int], np.ndarray, None]] = None, 455 indexes: Optional[Union[int, Sequence[int], np.ndarray, None]] = None, 456 include: bool = True, 457 inplace: bool = False 458 ) -> Optional['SNPObject']: 459 """ 460 Filter variants based on specified chromosome names, variant positions, or variant indexes. 461 462 This method updates the `calldata_gt`, `variants_ref`, `variants_alt`, 463 `variants_chrom`, `variants_filter_pass`, `variants_id`, `variants_pos`, 464 `variants_qual`, and `lai` attributes to include or exclude the specified variants. The filtering 465 criteria can be based on chromosome names, variant positions, or indexes. If multiple 466 criteria are provided, their union is used for filtering. The order of the variants is preserved. 467 468 Negative indexes are supported and follow 469 [NumPy's indexing conventions](https://numpy.org/doc/stable/user/basics.indexing.html). 470 471 Args: 472 chrom (str or array_like of str, optional): 473 Chromosome(s) to filter variants by. Can be a single chromosome as a string or a sequence 474 of chromosomes. If both `chrom` and `pos` are provided, they must either have matching lengths 475 (pairing each chromosome with a position) or `chrom` should be a single value that applies to 476 all positions in `pos`. Default is None. 477 pos (int or array_like of int, optional): 478 Position(s) to filter variants by. Can be a single position as an integer or a sequence of positions. 479 If `chrom` is also provided, `pos` should either match `chrom` in length or `chrom` should be a 480 single value. Default is None. 481 indexes (int or array_like of int, optional): 482 Index(es) of the variants to include or exclude. Can be a single index or a sequence 483 of indexes. Negative indexes are supported. Default is None. 484 include (bool, default=True): 485 If True, includes only the specified variants. If False, excludes the specified 486 variants. Default is True. 487 inplace (bool, default=False): 488 If True, modifies `self` in place. If False, returns a new `SNPObject` with the variants 489 filtered. Default is False. 490 491 Returns: 492 **Optional[SNPObject]:** 493 A new `SNPObject` with the specified variants filtered if `inplace=False`. 494 If `inplace=True`, modifies `self` in place and returns None. 495 """ 496 if chrom is None and pos is None and indexes is None: 497 raise ValueError("At least one of 'chrom', 'pos', or 'indexes' must be provided.") 498 499 n_snps = self.n_snps 500 501 # Convert inputs to arrays for consistency 502 chrom = np.atleast_1d(chrom) if chrom is not None else None 503 pos = np.atleast_1d(pos) if pos is not None else None 504 indexes = np.atleast_1d(indexes) if indexes is not None else None 505 506 # Validate chrom and pos lengths if both are provided 507 if chrom is not None and pos is not None: 508 if len(chrom) != len(pos) and len(chrom) > 1: 509 raise ValueError( 510 "When both 'chrom' and 'pos' are provided, they must either be of the same length " 511 "or 'chrom' must be a single value." 512 ) 513 514 # Create a mask for chromosome and position filtering 515 mask_combined = np.zeros(n_snps, dtype=bool) 516 if chrom is not None and pos is not None: 517 if len(chrom) == 1: 518 # Apply single chromosome to all positions in `pos` 519 mask_combined = (self['variants_chrom'] == chrom[0]) & np.isin(self['variants_pos'], pos) 520 else: 521 # Vectorized pair matching for chrom and pos 522 query_pairs = np.array( 523 list(zip(chrom, pos)), 524 dtype=[ 525 ('chrom', self['variants_chrom'].dtype), 526 ('pos', self['variants_pos'].dtype) 527 ] 528 ) 529 data_pairs = np.array( 530 list(zip(self['variants_chrom'], self['variants_pos'])), 531 dtype=[ 532 ('chrom', self['variants_chrom'].dtype), 533 ('pos', self['variants_pos'].dtype) 534 ] 535 ) 536 mask_combined = np.isin(data_pairs, query_pairs) 537 538 elif chrom is not None: 539 # Only chromosome filtering 540 mask_combined = np.isin(self['variants_chrom'], chrom) 541 elif pos is not None: 542 # Only position filtering 543 mask_combined = np.isin(self['variants_pos'], pos) 544 545 # Create mask based on indexes if provided 546 if indexes is not None: 547 # Validate indexes, allowing negative indexes 548 out_of_bounds_indexes = indexes[(indexes < -n_snps) | (indexes >= n_snps)] 549 if out_of_bounds_indexes.size > 0: 550 raise ValueError(f"One or more sample indexes are out of bounds.") 551 552 # Handle negative indexes and check for out-of-bounds indexes 553 adjusted_indexes = np.mod(indexes, n_snps) 554 555 # Create mask for specified indexes 556 mask_indexes = np.zeros(n_snps, dtype=bool) 557 mask_indexes[adjusted_indexes] = True 558 559 # Combine with `chrom` and `pos` mask using logical OR (union of all specified criteria) 560 mask_combined = mask_combined | mask_indexes 561 562 # Invert mask if `include` is False 563 if not include: 564 mask_combined = ~mask_combined 565 566 # Define keys to filter 567 keys = [ 568 'calldata_gt', 'variants_ref', 'variants_alt', 'variants_chrom', 'variants_filter_pass', 569 'variants_id', 'variants_pos', 'variants_qual', 'calldata_lai' 570 ] 571 572 # Apply filtering based on inplace parameter 573 if inplace: 574 for key in keys: 575 if self[key] is not None: 576 if self[key].ndim > 1: 577 self[key] = np.asarray(self[key])[mask_combined, ...] 578 else: 579 self[key] = np.asarray(self[key])[mask_combined] 580 581 return None 582 else: 583 # Create A new `SNPObject` with filtered data 584 snpobj = self.copy() 585 for key in keys: 586 if snpobj[key] is not None: 587 if snpobj[key].ndim > 1: 588 snpobj[key] = np.asarray(snpobj[key])[mask_combined, ...] 589 else: 590 snpobj[key] = np.asarray(snpobj[key])[mask_combined] 591 592 return snpobj
Filter variants based on specified chromosome names, variant positions, or variant indexes.
This method updates the calldata_gt, variants_ref, variants_alt,
variants_chrom, variants_filter_pass, variants_id, variants_pos,
variants_qual, and lai attributes to include or exclude the specified variants. The filtering
criteria can be based on chromosome names, variant positions, or indexes. If multiple
criteria are provided, their union is used for filtering. The order of the variants is preserved.
Negative indexes are supported and follow NumPy's indexing conventions.
Arguments:
- chrom (str or array_like of str, optional): Chromosome(s) to filter variants by. Can be a single chromosome as a string or a sequence
of chromosomes. If both
chromandposare provided, they must either have matching lengths (pairing each chromosome with a position) orchromshould be a single value that applies to all positions inpos. Default is None. - pos (int or array_like of int, optional): Position(s) to filter variants by. Can be a single position as an integer or a sequence of positions.
If
chromis also provided,posshould either matchchromin length orchromshould be a single value. Default is None. - indexes (int or array_like of int, optional): Index(es) of the variants to include or exclude. Can be a single index or a sequence of indexes. Negative indexes are supported. Default is None.
- include (bool, default=True): If True, includes only the specified variants. If False, excludes the specified variants. Default is True.
- inplace (bool, default=False): If True, modifies
selfin place. If False, returns a newSNPObjectwith the variants filtered. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObjectwith the specified variants filtered ifinplace=False. Ifinplace=True, modifiesselfin place and returns None.
594 def filter_samples( 595 self, 596 samples: Optional[Union[str, Sequence[str], np.ndarray, None]] = None, 597 indexes: Optional[Union[int, Sequence[int], np.ndarray, None]] = None, 598 include: bool = True, 599 reorder: bool = False, 600 inplace: bool = False 601 ) -> Optional['SNPObject']: 602 """ 603 Filter samples based on specified names or indexes. 604 605 This method updates the `samples` and `calldata_gt` attributes to include or exclude the specified 606 samples. The order of the samples is preserved. Set `reorder=True` to match the ordering of the 607 provided `samples` and/or `indexes` lists when including. 608 609 If both samples and indexes are provided, any sample matching either a name in samples or an index in 610 indexes will be included or excluded. 611 612 This method allows inclusion or exclusion of specific samples by their names or 613 indexes. When both sample names and indexes are provided, the union of the specified samples 614 is used. Negative indexes are supported and follow 615 [NumPy's indexing conventions](https://numpy.org/doc/stable/user/basics.indexing.html). 616 617 Args: 618 samples (str or array_like of str, optional): 619 Name(s) of the samples to include or exclude. Can be a single sample name or a 620 sequence of sample names. Default is None. 621 indexes (int or array_like of int, optional): 622 Index(es) of the samples to include or exclude. Can be a single index or a sequence 623 of indexes. Negative indexes are supported. Default is None. 624 include (bool, default=True): 625 If True, includes only the specified samples. If False, excludes the specified 626 samples. Default is True. 627 inplace (bool, default=False): 628 If True, modifies `self` in place. If False, returns a new `SNPObject` with the samples 629 filtered. Default is False. 630 631 Returns: 632 **Optional[SNPObject]:** 633 A new `SNPObject` with the specified samples filtered if `inplace=False`. 634 If `inplace=True`, modifies `self` in place and returns None. 635 """ 636 if samples is None and indexes is None: 637 raise ValueError("At least one of 'samples' or 'indexes' must be provided.") 638 639 n_samples = self.n_samples 640 sample_names = np.array(self['samples']) 641 642 # Create mask based on sample names 643 if samples is not None: 644 samples = np.asarray(samples).ravel() 645 mask_samples = np.isin(sample_names, samples) 646 missing_samples = samples[~np.isin(samples, sample_names)] 647 if missing_samples.size > 0: 648 raise ValueError(f"The following specified samples were not found: {missing_samples.tolist()}") 649 else: 650 mask_samples = np.zeros(n_samples, dtype=bool) 651 652 # Create mask based on sample indexes 653 if indexes is not None: 654 indexes = np.asarray(indexes).ravel() 655 656 # Validate indexes, allowing negative indexes 657 out_of_bounds_indexes = indexes[(indexes < -n_samples) | (indexes >= n_samples)] 658 if out_of_bounds_indexes.size > 0: 659 raise ValueError(f"One or more sample indexes are out of bounds.") 660 661 # Handle negative indexes 662 adjusted_indexes = np.mod(indexes, n_samples) 663 664 mask_indexes = np.zeros(n_samples, dtype=bool) 665 mask_indexes[adjusted_indexes] = True 666 else: 667 mask_indexes = np.zeros(n_samples, dtype=bool) 668 669 # Combine masks using logical OR (union of samples) 670 mask_combined = mask_samples | mask_indexes 671 672 if not include: 673 mask_combined = ~mask_combined 674 675 # If requested, compute an ordering of selected samples that follows the provided lists. 676 ordered_indices = None 677 if include and reorder: 678 sel_indices = np.where(mask_combined)[0] 679 ordered_list: List[int] = [] 680 added = np.zeros(n_samples, dtype=bool) 681 682 # Prioritize the order in `samples` 683 if samples is not None: 684 name_to_idx = {name: idx for idx, name in enumerate(sample_names)} 685 for s in samples: 686 idx = name_to_idx.get(s) 687 if idx is not None and mask_combined[idx] and not added[idx]: 688 ordered_list.append(idx) 689 added[idx] = True 690 691 # Then respect the order in `indexes` 692 if indexes is not None: 693 adj_idx = np.mod(np.atleast_1d(indexes), n_samples) 694 for idx in adj_idx: 695 if mask_combined[idx] and not added[idx]: 696 ordered_list.append(int(idx)) 697 added[idx] = True 698 699 # Finally, append any remaining selected samples in their original order 700 for idx in sel_indices: 701 if not added[idx]: 702 ordered_list.append(int(idx)) 703 704 ordered_indices = np.asarray(ordered_list, dtype=int) 705 706 # Define keys to filter 707 keys = ['samples', 'calldata_gt', 'calldata_lai'] 708 709 # Apply filtering based on inplace parameter 710 if inplace: 711 for key in keys: 712 if self[key] is not None: 713 arr = np.asarray(self[key]) 714 if ordered_indices is not None: 715 if key == 'calldata_lai' and arr.ndim == 2: 716 # Haplotype-aware reordering for 2D LAI (n_snps, n_samples*2) 717 hap_idx = np.concatenate([2*ordered_indices, 2*ordered_indices + 1]) 718 self[key] = arr[:, hap_idx] 719 elif arr.ndim > 1: 720 self[key] = arr[:, ordered_indices, ...] 721 else: 722 self[key] = arr[ordered_indices] 723 else: 724 if arr.ndim > 1: 725 self[key] = arr[:, mask_combined, ...] 726 else: 727 self[key] = arr[mask_combined] 728 729 return None 730 else: 731 # Create A new `SNPObject` with filtered data 732 snpobj = self.copy() 733 for key in keys: 734 if snpobj[key] is not None: 735 arr = np.asarray(snpobj[key]) 736 if ordered_indices is not None: 737 if key == 'calldata_lai' and arr.ndim == 2: 738 hap_idx = np.concatenate([2*ordered_indices, 2*ordered_indices + 1]) 739 snpobj[key] = arr[:, hap_idx] 740 elif arr.ndim > 1: 741 snpobj[key] = arr[:, ordered_indices, ...] 742 else: 743 snpobj[key] = arr[ordered_indices] 744 else: 745 if arr.ndim > 1: 746 snpobj[key] = arr[:, mask_combined, ...] 747 else: 748 snpobj[key] = arr[mask_combined] 749 return snpobj
Filter samples based on specified names or indexes.
This method updates the samples and calldata_gt attributes to include or exclude the specified
samples. The order of the samples is preserved. Set reorder=True to match the ordering of the
provided samples and/or indexes lists when including.
If both samples and indexes are provided, any sample matching either a name in samples or an index in indexes will be included or excluded.
This method allows inclusion or exclusion of specific samples by their names or indexes. When both sample names and indexes are provided, the union of the specified samples is used. Negative indexes are supported and follow NumPy's indexing conventions.
Arguments:
- samples (str or array_like of str, optional): Name(s) of the samples to include or exclude. Can be a single sample name or a sequence of sample names. Default is None.
- indexes (int or array_like of int, optional): Index(es) of the samples to include or exclude. Can be a single index or a sequence of indexes. Negative indexes are supported. Default is None.
- include (bool, default=True): If True, includes only the specified samples. If False, excludes the specified samples. Default is True.
- inplace (bool, default=False): If True, modifies
selfin place. If False, returns a newSNPObjectwith the samples filtered. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObjectwith the specified samples filtered ifinplace=False. Ifinplace=True, modifiesselfin place and returns None.
751 def detect_chromosome_format(self) -> str: 752 """ 753 Detect the chromosome naming convention in `variants_chrom` based on the prefix 754 of the first chromosome identifier in `unique_chrom`. 755 756 **Recognized formats:** 757 758 - `'chr'`: Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'. 759 - `'chm'`: Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'. 760 - `'chrom'`: Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'. 761 - `'plain'`: Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'. 762 763 If the format does not match any recognized pattern, `'Unknown format'` is returned. 764 765 Returns: 766 **str:** 767 A string indicating the detected chromosome format (`'chr'`, `'chm'`, `'chrom'`, or `'plain'`). 768 If no recognized format is matched, returns `'Unknown format'`. 769 """ 770 # Select the first unique chromosome identifier for format detection 771 chromosome_str = self.unique_chrom[0] 772 773 # Define regular expressions to match each recognized chromosome format 774 patterns = { 775 'chr': r'^chr(\d+|X|Y|M)$', # Matches 'chr' prefixed format 776 'chm': r'^chm(\d+|X|Y|M)$', # Matches 'chm' prefixed format 777 'chrom': r'^chrom(\d+|X|Y|M)$', # Matches 'chrom' prefixed format 778 'plain': r'^(\d+|X|Y|M)$' # Matches plain format without prefix 779 } 780 781 # Iterate through the patterns to identify the chromosome format 782 for prefix, pattern in patterns.items(): 783 if re.match(pattern, chromosome_str): 784 return prefix # Return the recognized format prefix 785 786 # If no pattern matches, return 'Unknown format' 787 return 'Unknown format'
Detect the chromosome naming convention in variants_chrom based on the prefix
of the first chromosome identifier in unique_chrom.
Recognized formats:
'chr': Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'.'chm': Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'.'chrom': Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'.'plain': Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'.
If the format does not match any recognized pattern, 'Unknown format' is returned.
Returns:
str: A string indicating the detected chromosome format (
'chr','chm','chrom', or'plain'). If no recognized format is matched, returns'Unknown format'.
789 def convert_chromosome_format( 790 self, 791 from_format: str, 792 to_format: str, 793 inplace: bool = False 794 ) -> Optional['SNPObject']: 795 """ 796 Convert the chromosome format from one naming convention to another in `variants_chrom`. 797 798 **Supported formats:** 799 800 - `'chr'`: Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'. 801 - `'chm'`: Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'. 802 - `'chrom'`: Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'. 803 - `'plain'`: Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'. 804 805 Args: 806 from_format (str): 807 The current chromosome format. Acceptable values are `'chr'`, `'chm'`, `'chrom'`, or `'plain'`. 808 to_format (str): 809 The target format for chromosome data conversion. Acceptable values match `from_format` options. 810 inplace (bool, default=False): 811 If True, modifies `self` in place. If False, returns a new `SNPObject` with the converted format. 812 Default is False. 813 814 Returns: 815 **Optional[SNPObject]:** A new `SNPObject` with the converted chromosome format if `inplace=False`. 816 If `inplace=True`, modifies `self` in place and returns None. 817 """ 818 # Define the list of standard chromosome identifiers 819 chrom_list = [*map(str, range(1, 23)), 'X', 'Y', 'M'] # M for mitochondrial chromosomes 820 821 # Format mappings for different chromosome naming conventions 822 format_mappings = { 823 'chr': [f'chr{i}' for i in chrom_list], 824 'chm': [f'chm{i}' for i in chrom_list], 825 'chrom': [f'chrom{i}' for i in chrom_list], 826 'plain': chrom_list, 827 } 828 829 # Verify that from_format and to_format are valid naming conventions 830 if from_format not in format_mappings or to_format not in format_mappings: 831 raise ValueError(f"Invalid format: {from_format} or {to_format}. Must be one of {list(format_mappings.keys())}.") 832 833 # Convert chromosomes to string for consistent comparison 834 variants_chrom = self['variants_chrom'].astype(str) 835 836 # Verify that all chromosomes in the object follow the specified `from_format` 837 expected_chroms = set(format_mappings[from_format]) 838 mismatched_chroms = set(variants_chrom) - expected_chroms 839 840 if mismatched_chroms: 841 raise ValueError(f"The following chromosomes do not match the `from_format` '{from_format}': {mismatched_chroms}.") 842 843 # Create conditions for selecting based on current `from_format` names 844 conditions = [variants_chrom == chrom for chrom in format_mappings[from_format]] 845 846 # Rename chromosomes based on inplace flag 847 if inplace: 848 self['variants_chrom'] = np.select(conditions, format_mappings[to_format], default='unknown') 849 return None 850 else: 851 snpobject = self.copy() 852 snpobject['variants_chrom'] = np.select(conditions, format_mappings[to_format], default='unknown') 853 return snpobject
Convert the chromosome format from one naming convention to another in variants_chrom.
Supported formats:
'chr': Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'.'chm': Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'.'chrom': Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'.'plain': Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'.
Arguments:
- from_format (str): The current chromosome format. Acceptable values are
'chr','chm','chrom', or'plain'. - to_format (str): The target format for chromosome data conversion. Acceptable values match
from_formatoptions. - inplace (bool, default=False): If True, modifies
selfin place. If False, returns a newSNPObjectwith the converted format. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObjectwith the converted chromosome format ifinplace=False. Ifinplace=True, modifiesselfin place and returns None.
855 def match_chromosome_format(self, snpobj: 'SNPObject', inplace: bool = False) -> Optional['SNPObject']: 856 """ 857 Convert the chromosome format in `variants_chrom` from `self` to match the format of a reference `snpobj`. 858 859 **Recognized formats:** 860 861 - `'chr'`: Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'. 862 - `'chm'`: Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'. 863 - `'chrom'`: Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'. 864 - `'plain'`: Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'. 865 866 Args: 867 snpobj (SNPObject): 868 The reference SNPObject whose chromosome format will be matched. 869 inplace (bool, default=False): 870 If True, modifies `self` in place. If False, returns a new `SNPObject` with the 871 chromosome format matching that of `snpobj`. Default is False. 872 873 Returns: 874 **Optional[SNPObject]:** 875 A new `SNPObject` with matched chromosome format if `inplace=False`. 876 If `inplace=True`, modifies `self` in place and returns None. 877 """ 878 # Detect the chromosome naming format of the current SNPObject 879 fmt1 = self.detect_chromosome_format() 880 if fmt1 == 'Unknown format': 881 raise ValueError("The chromosome format of the current SNPObject is unrecognized.") 882 883 # Detect the chromosome naming format of the reference SNPObject 884 fmt2 = snpobj.detect_chromosome_format() 885 if fmt2 == 'Unknown format': 886 raise ValueError("The chromosome format of the reference SNPObject is unrecognized.") 887 888 # Convert the current SNPObject's chromosome format to match the reference format 889 return self.convert_chromosome_format(fmt1, fmt2, inplace=inplace)
Convert the chromosome format in variants_chrom from self to match the format of a reference snpobj.
Recognized formats:
'chr': Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'.'chm': Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'.'chrom': Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'.'plain': Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'.
Arguments:
- snpobj (SNPObject): The reference SNPObject whose chromosome format will be matched.
- inplace (bool, default=False): If True, modifies
selfin place. If False, returns a newSNPObjectwith the chromosome format matching that ofsnpobj. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObjectwith matched chromosome format ifinplace=False. Ifinplace=True, modifiesselfin place and returns None.
891 def rename_chrom( 892 self, 893 to_replace: Union[Dict[str, str], str, List[str]] = {'^([0-9]+)$': r'chr\1', r'^chr([0-9]+)$': r'\1'}, 894 value: Optional[Union[str, List[str]]] = None, 895 regex: bool = True, 896 inplace: bool = False 897 ) -> Optional['SNPObject']: 898 """ 899 Replace chromosome values in `variants_chrom` using patterns or exact matches. 900 901 This method allows flexible chromosome replacements, using regex or exact matches, useful 902 for non-standard chromosome formats. For standard conversions (e.g., 'chr1' to '1'), 903 consider `convert_chromosome_format`. 904 905 Args: 906 to_replace (dict, str, or list of str): 907 Pattern(s) or exact value(s) to be replaced in chromosome names. Default behavior 908 transforms `<chrom_num>` to `chr<chrom_num>` or vice versa. Non-matching values 909 remain unchanged. 910 - If str or list of str: Matches will be replaced with `value`. 911 - If regex (bool), then any regex matches will be replaced with `value`. 912 - If dict: Keys defines values to replace, with corresponding replacements as values. 913 value (str or list of str, optional): 914 Replacement value(s) if `to_replace` is a string or list. Ignored if `to_replace` 915 is a dictionary. 916 regex (bool, default=True): 917 If True, interprets `to_replace` keys as regex patterns. 918 inplace (bool, default=False): 919 If True, modifies `self` in place. If False, returns a new `SNPObject` with the chromosomes 920 renamed. Default is False. 921 922 Returns: 923 **Optional[SNPObject]:** A new `SNPObject` with the renamed chromosome format if `inplace=False`. 924 If `inplace=True`, modifies `self` in place and returns None. 925 """ 926 # Standardize input format: convert `to_replace` and `value` to a dictionary if needed 927 if isinstance(to_replace, (str, int)): 928 to_replace = [to_replace] 929 if isinstance(value, (str, int)): 930 value = [value] 931 if isinstance(to_replace, list) and isinstance(value, list): 932 dictionary = dict(zip(to_replace, value)) 933 elif isinstance(to_replace, dict) and value is None: 934 dictionary = to_replace 935 else: 936 raise ValueError( 937 "Invalid input: `to_replace` and `value` must be compatible types (both str, list of str, or dict)." 938 ) 939 940 # Vectorized function for replacing values in chromosome array 941 vec_replace_values = np.vectorize(self._match_to_replace) 942 943 # Rename chromosomes based on inplace flag 944 if inplace: 945 self.variants_chrom = vec_replace_values(self.variants_chrom, dictionary, regex) 946 return None 947 else: 948 snpobj = self.copy() 949 snpobj.variants_chrom = vec_replace_values(self.variants_chrom, dictionary, regex) 950 return snpobj
Replace chromosome values in variants_chrom using patterns or exact matches.
This method allows flexible chromosome replacements, using regex or exact matches, useful
for non-standard chromosome formats. For standard conversions (e.g., 'chr1' to '1'),
consider convert_chromosome_format.
Arguments:
- to_replace (dict, str, or list of str): Pattern(s) or exact value(s) to be replaced in chromosome names. Default behavior
transforms
<chrom_num>tochr<chrom_num>or vice versa. Non-matching values remain unchanged.- If str or list of str: Matches will be replaced with
value. - If regex (bool), then any regex matches will be replaced with
value. - If dict: Keys defines values to replace, with corresponding replacements as values.
- If str or list of str: Matches will be replaced with
- value (str or list of str, optional): Replacement value(s) if
to_replaceis a string or list. Ignored ifto_replaceis a dictionary. - regex (bool, default=True): If True, interprets
to_replacekeys as regex patterns. - inplace (bool, default=False): If True, modifies
selfin place. If False, returns a newSNPObjectwith the chromosomes renamed. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObjectwith the renamed chromosome format ifinplace=False. Ifinplace=True, modifiesselfin place and returns None.
952 def rename_missings( 953 self, 954 before: Union[int, float, str] = -1, 955 after: Union[int, float, str] = '.', 956 inplace: bool = False 957 ) -> Optional['SNPObject']: 958 """ 959 Replace missing values in the `calldata_gt` attribute. 960 961 This method identifies missing values in 'calldata_gt' and replaces them with a specified 962 value. By default, it replaces occurrences of `-1` (often used to signify missing data) with `'.'`. 963 964 Args: 965 before (int, float, or str, default=-1): 966 The current representation of missing values in `calldata_gt`. Common values might be -1, '.', or NaN. 967 Default is -1. 968 after (int, float, or str, default='.'): 969 The value that will replace `before`. Default is '.'. 970 inplace (bool, default=False): 971 If True, modifies `self` in place. If False, returns a new `SNPObject` with the applied 972 replacements. Default is False. 973 974 Returns: 975 **Optional[SNPObject]:** 976 A new `SNPObject` with the renamed missing values if `inplace=False`. 977 If `inplace=True`, modifies `self` in place and returns None. 978 """ 979 # Rename missing values in the `calldata_gt` attribute based on inplace flag 980 if inplace: 981 self['calldata_gt'] = np.where(self['calldata_gt'] == before, after, self['calldata_gt']) 982 return None 983 else: 984 snpobj = self.copy() 985 snpobj['calldata_gt'] = np.where(snpobj['calldata_gt'] == before, after, snpobj['calldata_gt']) 986 return snpobj
Replace missing values in the calldata_gt attribute.
This method identifies missing values in 'calldata_gt' and replaces them with a specified
value. By default, it replaces occurrences of -1 (often used to signify missing data) with '.'.
Arguments:
- before (int, float, or str, default=-1): The current representation of missing values in
calldata_gt. Common values might be -1, '.', or NaN. Default is -1. - after (int, float, or str, default='.'): The value that will replace
before. Default is '.'. - inplace (bool, default=False): If True, modifies
selfin place. If False, returns a newSNPObjectwith the applied replacements. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObjectwith the renamed missing values ifinplace=False. Ifinplace=True, modifiesselfin place and returns None.
988 def get_common_variants_intersection( 989 self, 990 snpobj: 'SNPObject', 991 index_by: str = 'pos' 992 ) -> Tuple[List[str], np.ndarray, np.ndarray]: 993 """ 994 Identify common variants between `self` and the `snpobj` instance based on the specified `index_by` criterion, 995 which may match based on chromosome and position (`variants_chrom`, `variants_pos`), ID (`variants_id`), or both. 996 997 This method returns the identifiers of common variants and their corresponding indices in both objects. 998 999 Args: 1000 snpobj (SNPObject): 1001 The reference SNPObject to compare against. 1002 index_by (str, default='pos'): 1003 Criteria for matching variants. Options: 1004 - `'pos'`: Matches by chromosome and position (`variants_chrom`, `variants_pos`), e.g., 'chr1-12345'. 1005 - `'id'`: Matches by variant ID alone (`variants_id`), e.g., 'rs123'. 1006 - `'pos+id'`: Matches by chromosome, position, and ID (`variants_chrom`, `variants_pos`, `variants_id`), e.g., 'chr1-12345-rs123'. 1007 Default is 'pos'. 1008 1009 Returns: 1010 Tuple containing: 1011 - **list of str:** A list of common variant identifiers (as strings). 1012 - **array:** An array of indices in `self` where common variants are located. 1013 - **array:** An array of indices in `snpobj` where common variants are located. 1014 """ 1015 # Create unique identifiers for each variant in both SNPObjects based on the specified criterion 1016 if index_by == 'pos': 1017 query_identifiers = [f"{chrom}-{pos}" for chrom, pos in zip(self['variants_chrom'], self['variants_pos'])] 1018 reference_identifiers = [f"{chrom}-{pos}" for chrom, pos in zip(snpobj['variants_chrom'], snpobj['variants_pos'])] 1019 elif index_by == 'id': 1020 query_identifiers = self['variants_id'].tolist() 1021 reference_identifiers = snpobj['variants_id'].tolist() 1022 elif index_by == 'pos+id': 1023 query_identifiers = [ 1024 f"{chrom}-{pos}-{ids}" for chrom, pos, ids in zip(self['variants_chrom'], self['variants_pos'], self['variants_id']) 1025 ] 1026 reference_identifiers = [ 1027 f"{chrom}-{pos}-{ids}" for chrom, pos, ids in zip(snpobj['variants_chrom'], snpobj['variants_pos'], snpobj['variants_id']) 1028 ] 1029 else: 1030 raise ValueError("`index_by` must be one of 'pos', 'id', or 'pos+id'.") 1031 1032 # Convert to sets for intersection 1033 common_ids = set(query_identifiers).intersection(reference_identifiers) 1034 1035 # Collect indices for common identifiers 1036 query_idx = [i for i, id in enumerate(query_identifiers) if id in common_ids] 1037 reference_idx = [i for i, id in enumerate(reference_identifiers) if id in common_ids] 1038 1039 return list(common_ids), np.array(query_idx), np.array(reference_idx)
Identify common variants between self and the snpobj instance based on the specified index_by criterion,
which may match based on chromosome and position (variants_chrom, variants_pos), ID (variants_id), or both.
This method returns the identifiers of common variants and their corresponding indices in both objects.
Arguments:
- snpobj (SNPObject): The reference SNPObject to compare against.
- index_by (str, default='pos'): Criteria for matching variants. Options:
'pos': Matches by chromosome and position (variants_chrom,variants_pos), e.g., 'chr1-12345'.'id': Matches by variant ID alone (variants_id), e.g., 'rs123'.'pos+id': Matches by chromosome, position, and ID (variants_chrom,variants_pos,variants_id), e.g., 'chr1-12345-rs123'. Default is 'pos'.
Returns:
Tuple containing:
- list of str: A list of common variant identifiers (as strings).
- array: An array of indices in
selfwhere common variants are located.- array: An array of indices in
snpobjwhere common variants are located.
1041 def get_common_markers_intersection( 1042 self, 1043 snpobj: 'SNPObject' 1044 ) -> Tuple[List[str], np.ndarray, np.ndarray]: 1045 """ 1046 Identify common markers between between `self` and the `snpobj` instance. Common markers are identified 1047 based on matching chromosome (`variants_chrom`), position (`variants_pos`), reference (`variants_ref`), 1048 and alternate (`variants_alt`) alleles. 1049 1050 This method returns the identifiers of common markers and their corresponding indices in both objects. 1051 1052 Args: 1053 snpobj (SNPObject): 1054 The reference SNPObject to compare against. 1055 1056 Returns: 1057 Tuple containing: 1058 - **list of str:** A list of common variant identifiers (as strings). 1059 - **array:** An array of indices in `self` where common variants are located. 1060 - **array:** An array of indices in `snpobj` where common variants are located. 1061 """ 1062 # Generate unique identifiers based on chrom, pos, ref, and alt alleles 1063 query_identifiers = [ 1064 f"{chrom}-{pos}-{ref}-{alt}" for chrom, pos, ref, alt in 1065 zip(self['variants_chrom'], self['variants_pos'], self['variants_ref'], self['variants_alt']) 1066 ] 1067 reference_identifiers = [ 1068 f"{chrom}-{pos}-{ref}-{alt}" for chrom, pos, ref, alt in 1069 zip(snpobj['variants_chrom'], snpobj['variants_pos'], snpobj['variants_ref'], snpobj['variants_alt']) 1070 ] 1071 1072 # Convert to sets for intersection 1073 common_ids = set(query_identifiers).intersection(reference_identifiers) 1074 1075 # Collect indices for common identifiers in both SNPObjects 1076 query_idx = [i for i, id in enumerate(query_identifiers) if id in common_ids] 1077 reference_idx = [i for i, id in enumerate(reference_identifiers) if id in common_ids] 1078 1079 return list(common_ids), np.array(query_idx), np.array(reference_idx)
Identify common markers between between self and the snpobj instance. Common markers are identified
based on matching chromosome (variants_chrom), position (variants_pos), reference (variants_ref),
and alternate (variants_alt) alleles.
This method returns the identifiers of common markers and their corresponding indices in both objects.
Arguments:
- snpobj (SNPObject): The reference SNPObject to compare against.
Returns:
Tuple containing:
- list of str: A list of common variant identifiers (as strings).
- array: An array of indices in
selfwhere common variants are located.- array: An array of indices in
snpobjwhere common variants are located.
1081 def subset_to_common_variants( 1082 self, 1083 snpobj: 'SNPObject', 1084 index_by: str = 'pos', 1085 common_variants_intersection: Optional[Tuple[np.ndarray, np.ndarray]] = None, 1086 inplace: bool = False 1087 ) -> Optional['SNPObject']: 1088 """ 1089 Subset `self` to include only the common variants with a reference `snpobj` based on 1090 the specified `index_by` criterion, which may match based on chromosome and position 1091 (`variants_chrom`, `variants_pos`), ID (`variants_id`), or both. 1092 1093 Args: 1094 snpobj (SNPObject): 1095 The reference SNPObject to compare against. 1096 index_by (str, default='pos'): 1097 Criteria for matching variants. Options: 1098 - `'pos'`: Matches by chromosome and position (`variants_chrom`, `variants_pos`), e.g., 'chr1-12345'. 1099 - `'id'`: Matches by variant ID alone (`variants_id`), e.g., 'rs123'. 1100 - `'pos+id'`: Matches by chromosome, position, and ID (`variants_chrom`, `variants_pos`, `variants_id`), e.g., 'chr1-12345-rs123'. 1101 Default is 'pos'. 1102 common_variants_intersection (Tuple[np.ndarray, np.ndarray], optional): 1103 Precomputed indices of common variants between `self` and `snpobj`. If None, intersection is 1104 computed within the function. 1105 inplace (bool, default=False): 1106 If True, modifies `self` in place. If False, returns a new `SNPObject` with the common variants 1107 subsetted. Default is False. 1108 1109 Returns: 1110 **Optional[SNPObject]:** 1111 A new `SNPObject` with the common variants subsetted if `inplace=False`. 1112 If `inplace=True`, modifies `self` in place and returns None. 1113 """ 1114 # Get indices of common variants if not provided 1115 if common_variants_intersection is None: 1116 _, query_idx, _ = self.get_common_variants_intersection(snpobj, index_by=index_by) 1117 else: 1118 query_idx, _ = common_variants_intersection 1119 1120 # Use filter_variants method with the identified indices, applying `inplace` as specified 1121 return self.filter_variants(indexes=query_idx, include=True, inplace=inplace)
Subset self to include only the common variants with a reference snpobj based on
the specified index_by criterion, which may match based on chromosome and position
(variants_chrom, variants_pos), ID (variants_id), or both.
Arguments:
- snpobj (SNPObject): The reference SNPObject to compare against.
- index_by (str, default='pos'): Criteria for matching variants. Options:
'pos': Matches by chromosome and position (variants_chrom,variants_pos), e.g., 'chr1-12345'.'id': Matches by variant ID alone (variants_id), e.g., 'rs123'.'pos+id': Matches by chromosome, position, and ID (variants_chrom,variants_pos,variants_id), e.g., 'chr1-12345-rs123'. Default is 'pos'.
- common_variants_intersection (Tuple[np.ndarray, np.ndarray], optional): Precomputed indices of common variants between
selfandsnpobj. If None, intersection is computed within the function. - inplace (bool, default=False): If True, modifies
selfin place. If False, returns a newSNPObjectwith the common variants subsetted. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObjectwith the common variants subsetted ifinplace=False. Ifinplace=True, modifiesselfin place and returns None.
1123 def subset_to_common_markers( 1124 self, 1125 snpobj: 'SNPObject', 1126 common_markers_intersection: Optional[Tuple[np.ndarray, np.ndarray]] = None, 1127 inplace: bool = False 1128 ) -> Optional['SNPObject']: 1129 """ 1130 Subset `self` to include only the common markers with a reference `snpobj`. Common markers are identified 1131 based on matching chromosome (`variants_chrom`), position (`variants_pos`), reference (`variants_ref`), 1132 and alternate (`variants_alt`) alleles. 1133 1134 Args: 1135 snpobj (SNPObject): 1136 The reference SNPObject to compare against. 1137 common_markers_intersection (tuple of arrays, optional): 1138 Precomputed indices of common markers between `self` and `snpobj`. If None, intersection is 1139 computed within the function. 1140 inplace (bool, default=False): 1141 If True, modifies `self` in place. If False, returns a new `SNPObject` with the common markers 1142 subsetted. Default is False. 1143 1144 Returns: 1145 **Optional[SNPObject]:** 1146 A new `SNPObject` with the common markers subsetted if `inplace=False`. 1147 If `inplace=True`, modifies `self` in place and returns None. 1148 """ 1149 # Get indices of common markers if not provided 1150 if common_markers_intersection is None: 1151 _, query_idx, _ = self.get_common_markers_intersection(snpobj) 1152 else: 1153 query_idx, _ = common_markers_intersection 1154 1155 # Use filter_variants method with the identified indices, applying `inplace` as specified 1156 return self.filter_variants(indexes=query_idx, include=True, inplace=inplace)
Subset self to include only the common markers with a reference snpobj. Common markers are identified
based on matching chromosome (variants_chrom), position (variants_pos), reference (variants_ref),
and alternate (variants_alt) alleles.
Arguments:
- snpobj (SNPObject): The reference SNPObject to compare against.
- common_markers_intersection (tuple of arrays, optional): Precomputed indices of common markers between
selfandsnpobj. If None, intersection is computed within the function. - inplace (bool, default=False): If True, modifies
selfin place. If False, returns a newSNPObjectwith the common markers subsetted. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObjectwith the common markers subsetted ifinplace=False. Ifinplace=True, modifiesselfin place and returns None.
1158 def merge( 1159 self, 1160 snpobj: 'SNPObject', 1161 force_samples: bool = False, 1162 prefix: str = '2', 1163 inplace: bool = False 1164 ) -> Optional['SNPObject']: 1165 """ 1166 Merge `self` with `snpobj` along the sample axis. 1167 1168 This method expects both SNPObjects to contain the same set of SNPs in the same order, 1169 then combines their genotype (`calldata_gt`) and LAI (`calldata_lai`) arrays by 1170 concatenating the sample dimension. Samples from `snpobj` are appended to those in `self`. 1171 1172 Args: 1173 snpobj (SNPObject): 1174 The SNPObject to merge samples with. 1175 force_samples (bool, default=False): 1176 If True, duplicate sample names are resolved by prepending the `prefix` to duplicate sample names in 1177 `snpobj`. Otherwise, merging fails when duplicate sample names are found. Default is False. 1178 prefix (str, default='2'): 1179 A string prepended to duplicate sample names in `snpobj` when `force_samples=True`. 1180 Duplicates are renamed from `<sample_name>` to `<prefix>:<sample_name>`. For instance, 1181 if `prefix='2'` and there is a conflict with a sample called "sample_1", it becomes "2:sample_1". 1182 inplace (bool, default=False): 1183 If True, modifies `self` in place. If False, returns a new `SNPObject` with the merged samples. 1184 Default is False. 1185 1186 Returns: 1187 **Optional[SNPObject]**: A new SNPObject containing the merged sample data. 1188 """ 1189 # Merge calldata_gt if present and compatible 1190 if self.calldata_gt is not None and snpobj.calldata_gt is not None: 1191 if self.calldata_gt.shape[0] != snpobj.calldata_gt.shape[0]: 1192 raise ValueError( 1193 f"Cannot merge SNPObjects: Mismatch in the number of SNPs in `calldata_gt`.\n" 1194 f"`self.calldata_gt` has {self.calldata_gt.shape[0]} SNPs, " 1195 f"while `snpobj.calldata_gt` has {snpobj.calldata_gt.shape[0]} SNPs." 1196 ) 1197 if self.are_strands_summed and not snpobj.are_strands_summed: 1198 raise ValueError( 1199 "Cannot merge SNPObjects: `self` has summed strands, but `snpobj` does not.\n" 1200 "Ensure both objects have the same genotype summation state before merging." 1201 ) 1202 if not self.are_strands_summed and snpobj.are_strands_summed: 1203 raise ValueError( 1204 "Cannot merge SNPObjects: `snpobj` has summed strands, but `self` does not.\n" 1205 "Ensure both objects have the same genotype summation state before merging." 1206 ) 1207 calldata_gt = np.concatenate([self.calldata_gt, snpobj.calldata_gt], axis=1) 1208 else: 1209 calldata_gt = None 1210 1211 # Merge samples if present and compatible, handling duplicates if `force_samples=True` 1212 if self.samples is not None and snpobj.samples is not None: 1213 overlapping_samples = set(self.samples).intersection(set(snpobj.samples)) 1214 if overlapping_samples: 1215 if not force_samples: 1216 raise ValueError( 1217 f"Cannot merge SNPObjects: Found overlapping sample names {overlapping_samples}.\n" 1218 "Samples must be strictly non-overlapping. To allow merging with renaming, set `force_samples=True`." 1219 ) 1220 else: 1221 # Rename duplicate samples by prepending the file index 1222 renamed_samples = [f"{prefix}:{sample}" if sample in overlapping_samples else sample for sample in snpobj.samples] 1223 samples = np.concatenate([self.samples, renamed_samples], axis=0) 1224 else: 1225 samples = np.concatenate([self.samples, snpobj.samples], axis=0) 1226 else: 1227 samples = None 1228 1229 # Merge LAI data if present and compatible 1230 if self.calldata_lai is not None and snpobj.calldata_lai is not None: 1231 if self.calldata_lai.ndim != snpobj.calldata_lai.ndim: 1232 raise ValueError( 1233 f"Cannot merge SNPObjects: Mismatch in `calldata_lai` dimensions.\n" 1234 f"`self.calldata_lai` has {self.calldata_lai.ndim} dimensions, " 1235 f"while `snpobj.calldata_lai` has {snpobj.calldata_lai.ndim} dimensions." 1236 ) 1237 if self.calldata_lai.shape[0] != snpobj.calldata_lai.shape[0]: 1238 raise ValueError( 1239 f"Cannot merge SNPObjects: Mismatch in the number of SNPs in `calldata_lai`.\n" 1240 f"`self.calldata_lai` has {self.calldata_lai.shape[0]} SNPs, " 1241 f"while `snpobj.calldata_lai` has {snpobj.calldata_lai.shape[0]} SNPs." 1242 ) 1243 calldata_lai = np.concatenate([self.calldata_lai, snpobj.calldata_lai], axis=1) 1244 else: 1245 calldata_lai = None 1246 1247 if inplace: 1248 self.calldata_gt = calldata_gt 1249 self.calldata_lai = calldata_lai 1250 self.samples = samples 1251 return self 1252 1253 # Create and return a new SNPObject containing the merged samples 1254 return SNPObject( 1255 calldata_gt=calldata_gt, 1256 samples=samples, 1257 variants_ref=self.variants_ref, 1258 variants_alt=self.variants_alt, 1259 variants_chrom=self.variants_chrom, 1260 variants_filter_pass=self.variants_filter_pass, 1261 variants_id=self.variants_id, 1262 variants_pos=self.variants_pos, 1263 variants_qual=self.variants_qual, 1264 calldata_lai=calldata_lai, 1265 ancestry_map=self.ancestry_map 1266 )
Merge self with snpobj along the sample axis.
This method expects both SNPObjects to contain the same set of SNPs in the same order,
then combines their genotype (calldata_gt) and LAI (calldata_lai) arrays by
concatenating the sample dimension. Samples from snpobj are appended to those in self.
Arguments:
- snpobj (SNPObject): The SNPObject to merge samples with.
- force_samples (bool, default=False): If True, duplicate sample names are resolved by prepending the
prefixto duplicate sample names insnpobj. Otherwise, merging fails when duplicate sample names are found. Default is False. - prefix (str, default='2'): A string prepended to duplicate sample names in
snpobjwhenforce_samples=True. Duplicates are renamed from<sample_name>to<prefix>:<sample_name>. For instance, ifprefix='2'and there is a conflict with a sample called "sample_1", it becomes "2:sample_1". - inplace (bool, default=False): If True, modifies
selfin place. If False, returns a newSNPObjectwith the merged samples. Default is False.
Returns:
Optional[SNPObject]: A new SNPObject containing the merged sample data.
1268 def concat( 1269 self, 1270 snpobj: 'SNPObject', 1271 inplace: bool = False 1272 ) -> Optional['SNPObject']: 1273 """ 1274 Concatenate self with snpobj along the SNP axis. 1275 1276 This method expects both SNPObjects to contain the same set of samples in the same order, 1277 and that the chromosome(s) in snpobj follow (i.e. have higher numeric identifiers than) 1278 those in self. 1279 1280 Args: 1281 snpobj (SNPObject): 1282 The SNPObject to concatenate SNPs with. 1283 inplace (bool, default=False): 1284 If True, modifies `self` in place. If False, returns a new `SNPObject` with the concatenated SNPs. 1285 Default is False. 1286 1287 Returns: 1288 **Optional[SNPObject]**: A new SNPObject containing the concatenated SNP data. 1289 """ 1290 # Merge calldata_gt if present and compatible 1291 if self.calldata_gt is not None and snpobj.calldata_gt is not None: 1292 if self.calldata_gt.shape[1] != snpobj.calldata_gt.shape[1]: 1293 raise ValueError( 1294 f"Cannot merge SNPObjects: Mismatch in the number of samples in `calldata_gt`.\n" 1295 f"`self.calldata_gt` has {self.calldata_gt.shape[1]} samples, " 1296 f"while `snpobj.calldata_gt` has {snpobj.calldata_gt.shape[1]} samples." 1297 ) 1298 if self.are_strands_summed and not snpobj.are_strands_summed: 1299 raise ValueError( 1300 "Cannot merge SNPObjects: `self` has summed strands, but `snpobj` does not.\n" 1301 "Ensure both objects have the same genotype summation state before merging." 1302 ) 1303 if not self.are_strands_summed and snpobj.are_strands_summed: 1304 raise ValueError( 1305 "Cannot merge SNPObjects: `snpobj` has summed strands, but `self` does not.\n" 1306 "Ensure both objects have the same genotype summation state before merging." 1307 ) 1308 calldata_gt = np.concatenate([self.calldata_gt, snpobj.calldata_gt], axis=0) 1309 else: 1310 calldata_gt = None 1311 1312 # Merge SNP-related attributes if present 1313 attributes = [ 1314 'variants_ref', 'variants_alt', 'variants_chrom', 'variants_filter_pass', 'variants_id', 'variants_pos', 'variants_qual' 1315 ] 1316 merged_attrs = {} 1317 for attr in attributes: 1318 self_attr = getattr(self, attr, None) 1319 obj_attr = getattr(snpobj, attr, None) 1320 1321 # Concatenate if both present 1322 if self_attr is not None and obj_attr is not None: 1323 merged_attrs[attr] = np.concatenate([self_attr, obj_attr], axis=0) 1324 else: 1325 # If either is None, store None 1326 merged_attrs[attr] = None 1327 1328 # Merge LAI data if present and compatible 1329 if self.calldata_lai is not None and snpobj.calldata_lai is not None: 1330 if self.calldata_lai.ndim != snpobj.calldata_lai.ndim: 1331 raise ValueError( 1332 f"Cannot merge SNPObjects: Mismatch in `calldata_lai` dimensions.\n" 1333 f"`self.calldata_lai` has {self.calldata_lai.ndim} dimensions, " 1334 f"while `snpobj.calldata_lai` has {snpobj.calldata_lai.ndim} dimensions." 1335 ) 1336 if self.calldata_lai.shape[1] != snpobj.calldata_lai.shape[1]: 1337 raise ValueError( 1338 f"Cannot merge SNPObjects: Mismatch in the number of samples in `calldata_lai`.\n" 1339 f"`self.calldata_lai` has {self.calldata_lai.shape[1]} samples, " 1340 f"while `snpobj.calldata_lai` has {snpobj.calldata_lai.shape[1]} samples." 1341 ) 1342 calldata_lai = np.concatenate([self.calldata_lai, snpobj.calldata_lai], axis=0) 1343 else: 1344 calldata_lai = None 1345 1346 if inplace: 1347 self.calldata_gt = calldata_gt 1348 self.calldata_lai = calldata_lai 1349 for attr in attributes: 1350 self[attr] = merged_attrs[attr] 1351 return self 1352 1353 # Create and return a new SNPObject containing the concatenated SNPs 1354 return SNPObject( 1355 calldata_gt=calldata_gt, 1356 calldata_lai=calldata_lai, 1357 samples=self.samples, 1358 variants_ref=merged_attrs['variants_ref'], 1359 variants_alt=merged_attrs['variants_alt'], 1360 variants_chrom=merged_attrs['variants_chrom'], 1361 variants_id=merged_attrs['variants_id'], 1362 variants_pos=merged_attrs['variants_pos'], 1363 variants_qual=merged_attrs['variants_qual'], 1364 variants_filter_pass=merged_attrs['variants_filter_pass'], 1365 ancestry_map=self.ancestry_map 1366 )
Concatenate self with snpobj along the SNP axis.
This method expects both SNPObjects to contain the same set of samples in the same order, and that the chromosome(s) in snpobj follow (i.e. have higher numeric identifiers than) those in self.
Arguments:
- snpobj (SNPObject): The SNPObject to concatenate SNPs with.
- inplace (bool, default=False): If True, modifies
selfin place. If False, returns a newSNPObjectwith the concatenated SNPs. Default is False.
Returns:
Optional[SNPObject]: A new SNPObject containing the concatenated SNP data.
1368 def remove_strand_ambiguous_variants(self, inplace: bool = False) -> Optional['SNPObject']: 1369 """ 1370 A strand-ambiguous variant has reference (`variants_ref`) and alternate (`variants_alt`) alleles 1371 in the pairs A/T, T/A, C/G, or G/C, where both alleles are complementary and thus indistinguishable 1372 in terms of strand orientation. 1373 1374 Args: 1375 inplace (bool, default=False): 1376 If True, modifies `self` in place. If False, returns a new `SNPObject` with the 1377 strand-ambiguous variants removed. Default is False. 1378 1379 Returns: 1380 **Optional[SNPObject]:** A new `SNPObject` with non-ambiguous variants only if `inplace=False`. 1381 If `inplace=True`, modifies `self` in place and returns None. 1382 """ 1383 # Identify strand-ambiguous SNPs using vectorized comparisons 1384 is_AT = (self['variants_ref'] == 'A') & (self['variants_alt'] == 'T') 1385 is_TA = (self['variants_ref'] == 'T') & (self['variants_alt'] == 'A') 1386 is_CG = (self['variants_ref'] == 'C') & (self['variants_alt'] == 'G') 1387 is_GC = (self['variants_ref'] == 'G') & (self['variants_alt'] == 'C') 1388 1389 # Create a combined mask for all ambiguous variants 1390 ambiguous_mask = is_AT | is_TA | is_CG | is_GC 1391 non_ambiguous_idx = np.where(~ambiguous_mask)[0] 1392 1393 # Count each type of ambiguity using numpy's sum on boolean arrays 1394 A_T_count = np.sum(is_AT) 1395 T_A_count = np.sum(is_TA) 1396 C_G_count = np.sum(is_CG) 1397 G_C_count = np.sum(is_GC) 1398 1399 # Log the counts of each type of strand-ambiguous variants 1400 total_ambiguous = A_T_count + T_A_count + C_G_count + G_C_count 1401 log.info(f'{A_T_count} ambiguities of A-T type.') 1402 log.info(f'{T_A_count} ambiguities of T-A type.') 1403 log.info(f'{C_G_count} ambiguities of C-G type.') 1404 log.info(f'{G_C_count} ambiguities of G-C type.') 1405 1406 # Filter out ambiguous variants and keep non-ambiguous ones 1407 log.debug(f'Removing {total_ambiguous} strand-ambiguous variants...') 1408 return self.filter_variants(indexes=non_ambiguous_idx, include=True, inplace=inplace)
A strand-ambiguous variant has reference (variants_ref) and alternate (variants_alt) alleles
in the pairs A/T, T/A, C/G, or G/C, where both alleles are complementary and thus indistinguishable
in terms of strand orientation.
Arguments:
- inplace (bool, default=False): If True, modifies
selfin place. If False, returns a newSNPObjectwith the strand-ambiguous variants removed. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObjectwith non-ambiguous variants only ifinplace=False. Ifinplace=True, modifiesselfin place and returns None.
1410 def correct_flipped_variants( 1411 self, 1412 snpobj: 'SNPObject', 1413 check_complement: bool = True, 1414 index_by: str = 'pos', 1415 common_variants_intersection: Optional[Tuple[np.ndarray, np.ndarray]] = None, 1416 log_stats: bool = True, 1417 inplace: bool = False 1418 ) -> Optional['SNPObject']: 1419 """ 1420 Correct flipped variants between between `self` and a reference `snpobj`, where reference (`variants_ref`) 1421 and alternate (`variants_alt`) alleles are swapped. 1422 1423 **Flip Detection Based on `check_complement`:** 1424 1425 - If `check_complement=False`, only direct allele swaps are considered: 1426 1. **Direct Swap:** `self.variants_ref == snpobj.variants_alt` and `self.variants_alt == snpobj.variants_ref`. 1427 1428 - If `check_complement=True`, both direct and complementary swaps are considered, with four possible cases: 1429 1. **Direct Swap:** `self.variants_ref == snpobj.variants_alt` and `self.variants_alt == snpobj.variants_ref`. 1430 2. **Complement Swap of Ref:** `complement(self.variants_ref) == snpobj.variants_alt` and `self.variants_alt == snpobj.variants_ref`. 1431 3. **Complement Swap of Alt:** `self.variants_ref == snpobj.variants_alt` and `complement(self.variants_alt) == snpobj.variants_ref`. 1432 4. **Complement Swap of both Ref and Alt:** `complement(self.variants_ref) == snpobj.variants_alt` and `complement(self.variants_alt) == snpobj.variants_ref`. 1433 1434 **Note:** Variants where `self.variants_ref == self.variants_alt` are ignored as they are ambiguous. 1435 1436 **Correction Process:** 1437 - Swaps `variants_ref` and `variants_alt` alleles in `self` to align with `snpobj`. 1438 - Flips `calldata_gt` values (0 becomes 1, and 1 becomes 0) to match the updated allele configuration. 1439 1440 Args: 1441 snpobj (SNPObject): 1442 The reference SNPObject to compare against. 1443 check_complement (bool, default=True): 1444 If True, also checks for complementary base pairs (A/T, T/A, C/G, and G/C) when identifying swapped variants. 1445 Default is True. 1446 index_by (str, default='pos'): 1447 Criteria for matching variants. Options: 1448 - `'pos'`: Matches by chromosome and position (`variants_chrom`, `variants_pos`), e.g., 'chr1-12345'. 1449 - `'id'`: Matches by variant ID alone (`variants_id`), e.g., 'rs123'. 1450 - `'pos+id'`: Matches by chromosome, position, and ID (`variants_chrom`, `variants_pos`, `variants_id`), e.g., 'chr1-12345-rs123'. 1451 Default is 'pos'. 1452 common_variants_intersection (tuple of arrays, optional): 1453 Precomputed indices of common variants between `self` and `snpobj`. If None, intersection is 1454 computed within the function. 1455 log_stats (bool, default=True): 1456 If True, logs statistical information about matching and ambiguous alleles. Default is True. 1457 inplace (bool, default=False): 1458 If True, modifies `self` in place. If False, returns a new `SNPObject` with corrected 1459 flips. Default is False. 1460 1461 Returns: 1462 **Optional[SNPObject]**: 1463 A new `SNPObject` with corrected flips if `inplace=False`. 1464 If `inplace=True`, modifies `self` in place and returns None. 1465 """ 1466 # Define complement mappings for nucleotides 1467 complement_map = {'A': 'T', 'T': 'A', 'C': 'G', 'G': 'C'} 1468 1469 # Helper function to get the complement of a base 1470 def get_complement(base: str) -> str: 1471 return complement_map.get(base, base) 1472 1473 # Get common variant indices if not provided 1474 if common_variants_intersection != None: 1475 query_idx, reference_idx = common_variants_intersection 1476 else: 1477 _, query_idx, reference_idx = self.get_common_variants_intersection(snpobj, index_by=index_by) 1478 1479 # Log statistics on matching alleles if enabled 1480 if log_stats: 1481 matching_ref = np.sum(self['variants_ref'][query_idx] == snpobj['variants_ref'][reference_idx]) 1482 matching_alt = np.sum(self['variants_alt'][query_idx] == snpobj['variants_alt'][reference_idx]) 1483 ambiguous = np.sum(self['variants_ref'][query_idx] == self['variants_alt'][query_idx]) 1484 log.info(f"Matching reference alleles (ref=ref'): {matching_ref}, Matching alternate alleles (alt=alt'): {matching_alt}.") 1485 log.info(f"Number of ambiguous alleles (ref=alt): {ambiguous}.") 1486 1487 # Identify indices where `ref` and `alt` alleles are swapped 1488 if not check_complement: 1489 # Simple exact match for swapped alleles 1490 swapped_ref = (self['variants_ref'][query_idx] == snpobj['variants_alt'][reference_idx]) 1491 swapped_alt = (self['variants_alt'][query_idx] == snpobj['variants_ref'][reference_idx]) 1492 else: 1493 # Check for swapped or complementary-swapped alleles 1494 swapped_ref = ( 1495 (self['variants_ref'][query_idx] == snpobj['variants_alt'][reference_idx]) | 1496 (np.vectorize(get_complement)(self['variants_ref'][query_idx]) == snpobj['variants_alt'][reference_idx]) 1497 ) 1498 swapped_alt = ( 1499 (self['variants_alt'][query_idx] == snpobj['variants_ref'][reference_idx]) | 1500 (np.vectorize(get_complement)(self['variants_alt'][query_idx]) == snpobj['variants_ref'][reference_idx]) 1501 ) 1502 1503 # Filter out ambiguous variants where `ref` and `alt` alleles match (ref=alt) 1504 not_ambiguous = (self['variants_ref'][query_idx] != self['variants_alt'][query_idx]) 1505 1506 # Indices in `self` of flipped variants 1507 flip_idx_query = query_idx[swapped_ref & swapped_alt & not_ambiguous] 1508 1509 # Correct the identified variant flips 1510 if len(flip_idx_query) > 0: 1511 log.info(f'Correcting {len(flip_idx_query)} variant flips...') 1512 1513 temp_alts = self['variants_alt'][flip_idx_query] 1514 temp_refs = self['variants_ref'][flip_idx_query] 1515 1516 # Correct the variant flips based on whether the operation is in-place or not 1517 if inplace: 1518 self['variants_alt'][flip_idx_query] = temp_refs 1519 self['variants_ref'][flip_idx_query] = temp_alts 1520 self['calldata_gt'][flip_idx_query] = 1 - self['calldata_gt'][flip_idx_query] 1521 return None 1522 else: 1523 snpobj = self.copy() 1524 snpobj['variants_alt'][flip_idx_query] = temp_refs 1525 snpobj['variants_ref'][flip_idx_query] = temp_alts 1526 snpobj['calldata_gt'][flip_idx_query] = 1 - snpobj['calldata_gt'][flip_idx_query] 1527 return snpobj 1528 else: 1529 log.info('No variant flips found to correct.') 1530 return self if not inplace else None
Correct flipped variants between between self and a reference snpobj, where reference (variants_ref)
and alternate (variants_alt) alleles are swapped.
Flip Detection Based on check_complement:
If
check_complement=False, only direct allele swaps are considered:- Direct Swap:
self.variants_ref == snpobj.variants_altandself.variants_alt == snpobj.variants_ref.
- Direct Swap:
If
check_complement=True, both direct and complementary swaps are considered, with four possible cases:- Direct Swap:
self.variants_ref == snpobj.variants_altandself.variants_alt == snpobj.variants_ref. - Complement Swap of Ref:
complement(self.variants_ref) == snpobj.variants_altandself.variants_alt == snpobj.variants_ref. - Complement Swap of Alt:
self.variants_ref == snpobj.variants_altandcomplement(self.variants_alt) == snpobj.variants_ref. - Complement Swap of both Ref and Alt:
complement(self.variants_ref) == snpobj.variants_altandcomplement(self.variants_alt) == snpobj.variants_ref.
- Direct Swap:
Note: Variants where self.variants_ref == self.variants_alt are ignored as they are ambiguous.
Correction Process:
- Swaps
variants_refandvariants_altalleles inselfto align withsnpobj. - Flips
calldata_gtvalues (0 becomes 1, and 1 becomes 0) to match the updated allele configuration.
Arguments:
- snpobj (SNPObject): The reference SNPObject to compare against.
- check_complement (bool, default=True): If True, also checks for complementary base pairs (A/T, T/A, C/G, and G/C) when identifying swapped variants. Default is True.
- index_by (str, default='pos'): Criteria for matching variants. Options:
'pos': Matches by chromosome and position (variants_chrom,variants_pos), e.g., 'chr1-12345'.'id': Matches by variant ID alone (variants_id), e.g., 'rs123'.'pos+id': Matches by chromosome, position, and ID (variants_chrom,variants_pos,variants_id), e.g., 'chr1-12345-rs123'. Default is 'pos'.
- common_variants_intersection (tuple of arrays, optional): Precomputed indices of common variants between
selfandsnpobj. If None, intersection is computed within the function. - log_stats (bool, default=True): If True, logs statistical information about matching and ambiguous alleles. Default is True.
- inplace (bool, default=False): If True, modifies
selfin place. If False, returns a newSNPObjectwith corrected flips. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObjectwith corrected flips ifinplace=False. Ifinplace=True, modifiesselfin place and returns None.
1532 def remove_mismatching_variants( 1533 self, 1534 snpobj: 'SNPObject', 1535 index_by: str = 'pos', 1536 common_variants_intersection: Optional[Tuple[np.ndarray, np.ndarray]] = None, 1537 inplace: bool = False 1538 ) -> Optional['SNPObject']: 1539 """ 1540 Remove variants from `self`, where reference (`variants_ref`) and/or alternate (`variants_alt`) alleles 1541 do not match with a reference `snpobj`. 1542 1543 Args: 1544 snpobj (SNPObject): 1545 The reference SNPObject to compare against. 1546 index_by (str, default='pos'): 1547 Criteria for matching variants. Options: 1548 - `'pos'`: Matches by chromosome and position (`variants_chrom`, `variants_pos`), e.g., 'chr1-12345'. 1549 - `'id'`: Matches by variant ID alone (`variants_id`), e.g., 'rs123'. 1550 - `'pos+id'`: Matches by chromosome, position, and ID (`variants_chrom`, `variants_pos`, `variants_id`), e.g., 'chr1-12345-rs123'. 1551 Default is 'pos'. 1552 common_variants_intersection (tuple of arrays, optional): 1553 Precomputed indices of common variants between `self` and the reference `snpobj`. 1554 If None, the intersection is computed within the function. 1555 inplace (bool, default=False): 1556 If True, modifies `self` in place. If False, returns a new `SNPObject` without 1557 mismatching variants. Default is False. 1558 1559 Returns: 1560 **Optional[SNPObject]:** 1561 A new `SNPObject` without mismatching variants if `inplace=False`. 1562 If `inplace=True`, modifies `self` in place and returns None. 1563 """ 1564 # Get common variant indices if not provided 1565 if common_variants_intersection is not None: 1566 query_idx, reference_idx = common_variants_intersection 1567 else: 1568 _, query_idx, reference_idx = self.get_common_variants_intersection(snpobj, index_by=index_by) 1569 1570 # Vectorized comparison of `ref` and `alt` alleles 1571 ref_mismatch = self['variants_ref'][query_idx] != snpobj['variants_ref'][reference_idx] 1572 alt_mismatch = self['variants_alt'][query_idx] != snpobj['variants_alt'][reference_idx] 1573 mismatch_mask = ref_mismatch | alt_mismatch 1574 1575 # Identify indices in `self` of mismatching variants 1576 mismatch_idx = query_idx[mismatch_mask] 1577 1578 # Compute total number of variant mismatches 1579 total_mismatches = np.sum(mismatch_mask) 1580 1581 # Filter out mismatching variants 1582 log.debug(f'Removing {total_mismatches} mismatching variants...') 1583 return self.filter_variants(indexes=mismatch_idx, include=True, inplace=inplace)
Remove variants from self, where reference (variants_ref) and/or alternate (variants_alt) alleles
do not match with a reference snpobj.
Arguments:
- snpobj (SNPObject): The reference SNPObject to compare against.
- index_by (str, default='pos'): Criteria for matching variants. Options:
'pos': Matches by chromosome and position (variants_chrom,variants_pos), e.g., 'chr1-12345'.'id': Matches by variant ID alone (variants_id), e.g., 'rs123'.'pos+id': Matches by chromosome, position, and ID (variants_chrom,variants_pos,variants_id), e.g., 'chr1-12345-rs123'. Default is 'pos'.
- common_variants_intersection (tuple of arrays, optional): Precomputed indices of common variants between
selfand the referencesnpobj. If None, the intersection is computed within the function. - inplace (bool, default=False): If True, modifies
selfin place. If False, returns a newSNPObjectwithout mismatching variants. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObjectwithout mismatching variants ifinplace=False. Ifinplace=True, modifiesselfin place and returns None.
1585 def shuffle_variants(self, inplace: bool = False) -> Optional['SNPObject']: 1586 """ 1587 Randomly shuffle the positions of variants in the SNPObject, ensuring that all associated 1588 data (e.g., `calldata_gt` and variant-specific attributes) remain aligned. 1589 1590 Args: 1591 inplace (bool, default=False): 1592 If True, modifies `self` in place. If False, returns a new `SNPObject` with 1593 shuffled variants. Default is False. 1594 1595 Returns: 1596 **Optional[SNPObject]:** 1597 A new `SNPObject` without shuffled variant positions if `inplace=False`. 1598 If `inplace=True`, modifies `self` in place and returns None. 1599 """ 1600 # Generate a random permutation index for shuffling variant positions 1601 shuffle_index = np.random.permutation(self.n_snps) 1602 1603 # Apply shuffling to all relevant attributes using the class's dictionary-like interface 1604 if inplace: 1605 for key in self.keys(): 1606 if self[key] is not None: 1607 if key == 'calldata_gt': 1608 # `calldata_gt`` has a different shape, so it's shuffled along axis 0 1609 self[key] = self[key][shuffle_index, ...] 1610 elif 'variant' in key: 1611 # snpobj attributes are 1D arrays 1612 self[key] = np.asarray(self[key])[shuffle_index] 1613 return None 1614 else: 1615 shuffled_snpobj = self.copy() 1616 for key in shuffled_snpobj.keys(): 1617 if shuffled_snpobj[key] is not None: 1618 if key == 'calldata_gt': 1619 shuffled_snpobj[key] = shuffled_snpobj[key][shuffle_index, ...] 1620 elif 'variant' in key: 1621 shuffled_snpobj[key] = np.asarray(shuffled_snpobj[key])[shuffle_index] 1622 return shuffled_snpobj
Randomly shuffle the positions of variants in the SNPObject, ensuring that all associated
data (e.g., calldata_gt and variant-specific attributes) remain aligned.
Arguments:
- inplace (bool, default=False): If True, modifies
selfin place. If False, returns a newSNPObjectwith shuffled variants. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObjectwithout shuffled variant positions ifinplace=False. Ifinplace=True, modifiesselfin place and returns None.
1624 def set_empty_to_missing(self, inplace: bool = False) -> Optional['SNPObject']: 1625 """ 1626 Replace empty strings `''` with missing values `'.'` in attributes of `self`. 1627 1628 Args: 1629 inplace (bool, default=False): 1630 If True, modifies `self` in place. If False, returns a new `SNPObject` with empty 1631 strings `''` replaced by missing values `'.'`. Default is False. 1632 1633 Returns: 1634 **Optional[SNPObject]:** 1635 A new `SNPObject` with empty strings replaced if `inplace=False`. 1636 If `inplace=True`, modifies `self` in place and returns None. 1637 """ 1638 if inplace: 1639 if self.variants_alt is not None: 1640 self.variants_alt[self.variants_alt == ''] = '.' 1641 if self.variants_ref is not None: 1642 self.variants_ref[self.variants_ref == ''] = '.' 1643 if self.variants_qual is not None: 1644 self.variants_qual = self.variants_qual.astype(str) 1645 self.variants_qual[(self.variants_qual == '') | (self.variants_qual == 'nan')] = '.' 1646 if self.variants_chrom is not None: 1647 self.variants_chrom = self.variants_chrom.astype(str) 1648 self.variants_chrom[self.variants_chrom == ''] = '.' 1649 if self.variants_filter_pass is not None: 1650 self.variants_filter_pass[self.variants_filter_pass == ''] = '.' 1651 if self.variants_id is not None: 1652 self.variants_id[self.variants_id == ''] = '.' 1653 return self 1654 else: 1655 snpobj = self.copy() 1656 if snpobj.variants_alt is not None: 1657 snpobj.variants_alt[snpobj.variants_alt == ''] = '.' 1658 if snpobj.variants_ref is not None: 1659 snpobj.variants_ref[snpobj.variants_ref == ''] = '.' 1660 if snpobj.variants_qual is not None: 1661 snpobj.variants_qual = snpobj.variants_qual.astype(str) 1662 snpobj.variants_qual[(snpobj.variants_qual == '') | (snpobj.variants_qual == 'nan')] = '.' 1663 if snpobj.variants_chrom is not None: 1664 snpobj.variants_chrom[snpobj.variants_chrom == ''] = '.' 1665 if snpobj.variants_filter_pass is not None: 1666 snpobj.variants_filter_pass[snpobj.variants_filter_pass == ''] = '.' 1667 if snpobj.variants_id is not None: 1668 snpobj.variants_id[snpobj.variants_id == ''] = '.' 1669 return snpobj
Replace empty strings '' with missing values '.' in attributes of self.
Arguments:
- inplace (bool, default=False): If True, modifies
selfin place. If False, returns a newSNPObjectwith empty strings''replaced by missing values'.'. Default is False.
Returns:
Optional[SNPObject]: A new
SNPObjectwith empty strings replaced ifinplace=False. Ifinplace=True, modifiesselfin place and returns None.
1671 def convert_to_window_level( 1672 self, 1673 window_size: Optional[int] = None, 1674 physical_pos: Optional[np.ndarray] = None, 1675 chromosomes: Optional[np.ndarray] = None, 1676 window_sizes: Optional[np.ndarray] = None, 1677 laiobj: Optional['LocalAncestryObject'] = None 1678 ) -> 'LocalAncestryObject': 1679 """ 1680 Aggregate the `calldata_lai` attribute into genomic windows within a 1681 `snputils.ancestry.genobj.LocalAncestryObject`. 1682 1683 **Options for defining windows (in order of precedence):** 1684 1685 1. **Fixed window size**: 1686 - Use `window_size` to specify how many SNPs go into each window. The last window on each 1687 chromosome may be larger if SNPs are not evenly divisible by the size. 1688 1689 2. **Custom start and end positions**: 1690 - Provide `physical_pos` (2D array of shape (n_windows, 2)) as the [start, end] base-pair 1691 coordinates for each window. 1692 - If `chromosomes` is not provided and `self` has exactly one chromosome, all windows are 1693 assumed to belong to that chromosome. 1694 - If multiple chromosomes exist but `chromosomes` is missing, an error will be raised. 1695 - Optionally, provide `window_sizes` to store the SNP count per-window. 1696 1697 3. **Matching existing windows**: 1698 - Reuse window definitions (`physical_pos`, `chromosomes`, `window_sizes`) from an existing `laiobj`. 1699 1700 Args: 1701 window_size (int, optional): 1702 Number of SNPs in each window if defining fixed-size windows. If the total number of 1703 SNPs in a chromosome is not evenly divisible by the window size, the last window on that 1704 chromosome will include all remaining SNPs and therefore be larger than the specified size. 1705 physical_pos (array of shape (n_windows, 2), optional): 1706 A 2D array containing the start and end physical positions for each window. 1707 chromosomes (array of shape (n_windows,), optional): 1708 An array with chromosome numbers corresponding to each genomic window. 1709 window_sizes (array of shape (n_windows,), optional): 1710 An array specifying the number of SNPs in each genomic window. 1711 laiobj (LocalAncestryObject, optional): 1712 A reference `LocalAncestryObject` from which to copy existing window definitions. 1713 1714 Returns: 1715 **LocalAncestryObject:** 1716 A LocalAncestryObject containing window-level ancestry data. 1717 """ 1718 from snputils.ancestry.genobj.local import LocalAncestryObject 1719 1720 if window_size is None and physical_pos is None and laiobj is None: 1721 raise ValueError("One of `window_size`, `physical_pos`, or `laiobj` must be provided.") 1722 1723 # Fixed window size 1724 if window_size is not None: 1725 physical_pos = [] # Boundaries [start, end] of each window 1726 chromosomes = [] # Chromosome for each window 1727 window_sizes = [] # Number of SNPs for each window 1728 for chrom in self.unique_chrom: 1729 # Extract indices corresponding to this chromosome 1730 mask_chrom = (self.variants_chrom == chrom) 1731 # Subset to this chromosome 1732 pos_chrom = self.variants_pos[mask_chrom] 1733 # Number of SNPs for this chromosome 1734 n_snps_chrom = pos_chrom.size 1735 1736 # Initialize the start of the first window with the position of the first SNP 1737 current_start = self.variants_pos[0] 1738 1739 # Number of full windows with exactly `window_size` SNPs 1740 n_full_windows = n_snps_chrom // window_size 1741 1742 # Build all but the last window 1743 for i in range(n_full_windows-1): 1744 current_end = self.variants_pos[(i+1) * window_size - 1] 1745 physical_pos.append([current_start, current_end]) 1746 chromosomes.append(chrom) 1747 window_sizes.append(window_size) 1748 current_start = self.variants_pos[(i+1) * window_size] 1749 1750 # Build the last window 1751 current_end = self.variants_pos[-1] 1752 physical_pos.append([current_start, current_end]) 1753 chromosomes.append(chrom) 1754 window_sizes.append(n_snps_chrom - ((n_full_windows - 1) * window_size)) 1755 1756 physical_pos = np.array(physical_pos) 1757 chromosomes = np.array(chromosomes) 1758 window_sizes = np.array(window_sizes) 1759 1760 # Custom start and end positions 1761 elif physical_pos is not None: 1762 # Check if there is exactly one chromosome 1763 if chromosomes is None: 1764 unique_chrom = self.unique_chrom 1765 if len(unique_chrom) == 1: 1766 # We assume all windows belong to this single chromosome 1767 single_chrom = unique_chrom[0] 1768 chromosomes = np.array([single_chrom] * physical_pos.shape[0]) 1769 else: 1770 raise ValueError("Multiple chromosomes detected, but `chromosomes` was not provided.") 1771 1772 # Match existing windows to a reference laiobj 1773 elif laiobj is not None: 1774 physical_pos = laiobj.physical_pos 1775 chromosomes = laiobj.chromosomes 1776 window_sizes = laiobj.window_sizes 1777 1778 # Allocate an output LAI array 1779 n_windows = physical_pos.shape[0] 1780 n_samples = self.n_samples 1781 if self.calldata_lai.ndim == 3: 1782 lai = np.zeros((n_windows, n_samples, 2)) 1783 else: 1784 lai = np.zeros((n_windows, n_samples*2)) 1785 1786 # For each window, find the relevant SNPs and compute the mode of the ancestries 1787 for i, ((start, end), chrom) in enumerate(zip(physical_pos, chromosomes)): 1788 snps_mask = ( 1789 (self.variants_chrom == chrom) & 1790 (self.variants_pos >= start) & 1791 (self.variants_pos <= end) 1792 ) 1793 if np.any(snps_mask): 1794 lai_mask = self.calldata_lai[snps_mask, ...] 1795 mode_ancestries = mode(lai_mask, axis=0, nan_policy='omit').mode 1796 lai[i] = mode_ancestries 1797 else: 1798 lai[i] = np.nan 1799 1800 # Generate haplotype labels, e.g. "Sample1.0", "Sample1.1" 1801 haplotypes = [f"{sample}.{i}" for sample in self.samples for i in range(2)] 1802 1803 # If original data was (n_snps, n_samples, 2), flatten to (n_windows, n_samples*2) 1804 if self.calldata_lai.ndim == 3: 1805 lai = lai.reshape(n_windows, -1) 1806 1807 # Aggregate into a LocalAncestryObject 1808 return LocalAncestryObject( 1809 haplotypes=haplotypes, 1810 lai=lai, 1811 samples=self.samples, 1812 ancestry_map=self.ancestry_map, 1813 window_sizes=window_sizes, 1814 physical_pos=physical_pos, 1815 chromosomes=chromosomes 1816 )
Aggregate the calldata_lai attribute into genomic windows within a
snputils.ancestry.genobj.LocalAncestryObject.
Options for defining windows (in order of precedence):
- Fixed window size:
- Use
window_sizeto specify how many SNPs go into each window. The last window on each chromosome may be larger if SNPs are not evenly divisible by the size.
- Custom start and end positions:
- Provide
physical_pos(2D array of shape (n_windows, 2)) as the [start, end] base-pair coordinates for each window. - If
chromosomesis not provided andselfhas exactly one chromosome, all windows are assumed to belong to that chromosome. - If multiple chromosomes exist but
chromosomesis missing, an error will be raised. - Optionally, provide
window_sizesto store the SNP count per-window.
- Matching existing windows:
- Reuse window definitions (
physical_pos,chromosomes,window_sizes) from an existinglaiobj.
Arguments:
- window_size (int, optional): Number of SNPs in each window if defining fixed-size windows. If the total number of SNPs in a chromosome is not evenly divisible by the window size, the last window on that chromosome will include all remaining SNPs and therefore be larger than the specified size.
- physical_pos (array of shape (n_windows, 2), optional): A 2D array containing the start and end physical positions for each window.
- chromosomes (array of shape (n_windows,), optional): An array with chromosome numbers corresponding to each genomic window.
- window_sizes (array of shape (n_windows,), optional): An array specifying the number of SNPs in each genomic window.
- laiobj (LocalAncestryObject, optional): A reference
LocalAncestryObjectfrom which to copy existing window definitions.
Returns:
LocalAncestryObject: A LocalAncestryObject containing window-level ancestry data.
1818 def save(self, file: Union[str, Path]) -> None: 1819 """ 1820 Save the data stored in `self` to a specified file. 1821 1822 The format of the saved file is determined by the file extension provided in the `file` 1823 argument. 1824 1825 **Supported formats:** 1826 1827 - `.bed`: Binary PED (Plink) format. 1828 - `.pgen`: Plink2 binary genotype format. 1829 - `.vcf`: Variant Call Format. 1830 - `.pkl`: Pickle format for saving `self` in serialized form. 1831 1832 Args: 1833 file (str or pathlib.Path): 1834 Path to the file where the data will be saved. The extension of the file determines the save format. 1835 Supported extensions: `.bed`, `.pgen`, `.vcf`, `.pkl`. 1836 """ 1837 ext = Path(file).suffix.lower() 1838 if ext == '.bed': 1839 self.save_bed(file) 1840 elif ext == '.pgen': 1841 self.save_pgen(file) 1842 elif ext == '.vcf': 1843 self.save_vcf(file) 1844 elif ext == '.pkl': 1845 self.save_pickle(file) 1846 else: 1847 raise ValueError(f"Unsupported file extension: {ext}")
Save the data stored in self to a specified file.
The format of the saved file is determined by the file extension provided in the file
argument.
Supported formats:
.bed: Binary PED (Plink) format..pgen: Plink2 binary genotype format..vcf: Variant Call Format..pkl: Pickle format for savingselfin serialized form.
Arguments:
- file (str or pathlib.Path): Path to the file where the data will be saved. The extension of the file determines the save format.
Supported extensions:
.bed,.pgen,.vcf,.pkl.
1849 def save_bed(self, file: Union[str, Path]) -> None: 1850 """ 1851 Save the data stored in `self` to a `.bed` file. 1852 1853 Args: 1854 file (str or pathlib.Path): 1855 Path to the file where the data will be saved. It should end with `.bed`. 1856 If the provided path does not have this extension, it will be appended. 1857 """ 1858 from snputils.snp.io.write.bed import BEDWriter 1859 writer = BEDWriter(snpobj=self, filename=file) 1860 writer.write()
Save the data stored in self to a .bed file.
Arguments:
- file (str or pathlib.Path): Path to the file where the data will be saved. It should end with
.bed. If the provided path does not have this extension, it will be appended.
1862 def save_pgen(self, file: Union[str, Path]) -> None: 1863 """ 1864 Save the data stored in `self` to a `.pgen` file. 1865 1866 Args: 1867 file (str or pathlib.Path): 1868 Path to the file where the data will be saved. It should end with `.pgen`. 1869 If the provided path does not have this extension, it will be appended. 1870 """ 1871 from snputils.snp.io.write.pgen import PGENWriter 1872 writer = PGENWriter(snpobj=self, filename=file) 1873 writer.write()
Save the data stored in self to a .pgen file.
Arguments:
- file (str or pathlib.Path): Path to the file where the data will be saved. It should end with
.pgen. If the provided path does not have this extension, it will be appended.
1875 def save_vcf(self, file: Union[str, Path]) -> None: 1876 """ 1877 Save the data stored in `self` to a `.vcf` file. 1878 1879 Args: 1880 file (str or pathlib.Path): 1881 Path to the file where the data will be saved. It should end with `.vcf`. 1882 If the provided path does not have this extension, it will be appended. 1883 """ 1884 from snputils.snp.io.write.vcf import VCFWriter 1885 writer = VCFWriter(snpobj=self, filename=file) 1886 writer.write()
Save the data stored in self to a .vcf file.
Arguments:
- file (str or pathlib.Path): Path to the file where the data will be saved. It should end with
.vcf. If the provided path does not have this extension, it will be appended.
1888 def save_pickle(self, file: Union[str, Path]) -> None: 1889 """ 1890 Save `self` in serialized form to a `.pkl` file. 1891 1892 Args: 1893 file (str or pathlib.Path): 1894 Path to the file where the data will be saved. It should end with `.pkl`. 1895 If the provided path does not have this extension, it will be appended. 1896 """ 1897 import pickle 1898 with open(file, 'wb') as file: 1899 pickle.dump(self, file)
Save self in serialized form to a .pkl file.
Arguments:
- file (str or pathlib.Path): Path to the file where the data will be saved. It should end with
.pkl. If the provided path does not have this extension, it will be appended.