snputils.snp.genobj

1from .snpobj import SNPObject
2
3__all__ = ['SNPObject']
class SNPObject:
  17class SNPObject:
  18    """
  19    A class for Single Nucleotide Polymorphism (SNP) data, with optional support for 
  20    SNP-level Local Ancestry Information (LAI).
  21    """
  22    def __init__(
  23        self,
  24        calldata_gt: Optional[np.ndarray] = None,
  25        samples: Optional[np.ndarray] = None,
  26        variants_ref: Optional[np.ndarray] = None,
  27        variants_alt: Optional[np.ndarray] = None,
  28        variants_chrom: Optional[np.ndarray] = None,
  29        variants_filter_pass: Optional[np.ndarray] = None,
  30        variants_id: Optional[np.ndarray] = None,
  31        variants_pos: Optional[np.ndarray] = None,
  32        variants_qual: Optional[np.ndarray] = None,
  33        calldata_lai: Optional[np.ndarray] = None,
  34        ancestry_map: Optional[Dict[str, str]] = None
  35    ) -> None:
  36        """
  37        Args:
  38            calldata_gt (array, optional): 
  39                An array containing genotype data for each sample. This array can be either 2D with shape 
  40                `(n_snps, n_samples)` if the paternal and maternal strands are summed, or 3D with shape 
  41                `(n_snps, n_samples, 2)` if the strands are kept separate.
  42            samples (array of shape (n_sampels,), optional): 
  43                An array containing unique sample identifiers.
  44            variants_ref (array of shape (n_snps,), optional): 
  45                An array containing the reference allele for each SNP.
  46            variants_alt (array of shape (n_snps,), optional): 
  47                An array containing the alternate allele for each SNP.
  48            variants_chrom (array of shape (n_snps,), optional): 
  49                An array containing the chromosome for each SNP.
  50            variants_filter_pass (array of shape (n_snps,), optional): 
  51                An array indicating whether each SNP passed control checks.
  52            variants_id (array of shape (n_snps,), optional): 
  53                An array containing unique identifiers (IDs) for each SNP.
  54            variants_pos (array of shape (n_snps,), optional): 
  55                An array containing the chromosomal positions for each SNP.
  56            variants_qual (array of shape (n_snps,), optional): 
  57                An array containing the Phred-scaled quality score for each SNP.
  58            calldata_lai (array, optional): 
  59                An array containing the ancestry for each SNP. This array can be either 2D with shape
  60                `(n_snps, n_samples*2)`, or 3D with shape (n_snps, n_samples, 2).
  61            ancestry_map (dict of str to str, optional): 
  62                A dictionary mapping ancestry codes to region names.
  63        """
  64        self.__calldata_gt = calldata_gt
  65        self.__samples = samples
  66        self.__variants_ref = variants_ref
  67        self.__variants_alt = variants_alt
  68        self.__variants_chrom = variants_chrom
  69        self.__variants_filter_pass = variants_filter_pass
  70        self.__variants_id = variants_id
  71        self.__variants_pos = variants_pos
  72        self.__variants_qual = variants_qual
  73        self.__calldata_lai = calldata_lai
  74        self.__ancestry_map = ancestry_map
  75
  76        self._sanity_check()
  77
  78    def __getitem__(self, key: str) -> Any:
  79        """
  80        To access an attribute of the class using the square bracket notation,
  81        similar to a dictionary.
  82        """
  83        try:
  84            return getattr(self, key)
  85        except:
  86            raise KeyError(f'Invalid key: {key}.')
  87
  88    def __setitem__(self, key: str, value: Any):
  89        """
  90        To set an attribute of the class using the square bracket notation,
  91        similar to a dictionary.
  92        """
  93        try:
  94            setattr(self, key, value)
  95        except:
  96            raise KeyError(f'Invalid key: {key}.')
  97
  98    @property
  99    def calldata_gt(self) -> np.ndarray:
 100        """
 101        Retrieve `calldata_gt`.
 102
 103        Returns:
 104            **array:** 
 105                An array containing genotype data for each sample. This array can be either 2D with shape 
 106                `(n_snps, n_samples)` if the paternal and maternal strands are summed, or 3D with shape 
 107                `(n_snps, n_samples, 2)` if the strands are kept separate.
 108        """
 109        return self.__calldata_gt
 110
 111    @calldata_gt.setter
 112    def calldata_gt(self, x: np.ndarray):
 113        """
 114        Update `calldata_gt`.
 115        """
 116        self.__calldata_gt = x
 117
 118    @property
 119    def samples(self) -> Optional[np.ndarray]:
 120        """
 121        Retrieve `samples`.
 122
 123        Returns:
 124            **array of shape (n_sampels,):** 
 125                An array containing unique sample identifiers.
 126        """
 127        return self.__samples
 128
 129    @samples.setter
 130    def samples(self, x: Union[List, np.ndarray]):
 131        """
 132        Update `samples`.
 133        """
 134        self.__samples = np.asarray(x)
 135
 136    @property
 137    def variants_ref(self) -> Optional[np.ndarray]:
 138        """
 139        Retrieve `variants_ref`.
 140
 141        Returns:
 142            **array of shape (n_snps,):** An array containing the reference allele for each SNP.
 143        """
 144        return self.__variants_ref
 145
 146    @variants_ref.setter
 147    def variants_ref(self, x: np.ndarray):
 148        """
 149        Update `variants_ref`.
 150        """
 151        self.__variants_ref = x
 152
 153    @property
 154    def variants_alt(self) -> Optional[np.ndarray]:
 155        """
 156        Retrieve `variants_alt`.
 157
 158        Returns:
 159            **array of shape (n_snps,):** An array containing the alternate allele for each SNP.
 160        """
 161        return self.__variants_alt
 162
 163    @variants_alt.setter
 164    def variants_alt(self, x: np.ndarray):
 165        """
 166        Update `variants_alt`.
 167        """
 168        self.__variants_alt = x
 169
 170    @property
 171    def variants_chrom(self) -> Optional[np.ndarray]:
 172        """
 173        Retrieve `variants_chrom`.
 174
 175        Returns:
 176            **array of shape (n_snps,):** An array containing the chromosome for each SNP.
 177        """
 178        return self.__variants_chrom
 179
 180    @variants_chrom.setter
 181    def variants_chrom(self, x: np.ndarray):
 182        """
 183        Update `variants_chrom`.
 184        """
 185        self.__variants_chrom = x
 186
 187    @property
 188    def variants_filter_pass(self) -> Optional[np.ndarray]:
 189        """
 190        Retrieve `variants_filter_pass`.
 191
 192        Returns:
 193            **array of shape (n_snps,):** An array indicating whether each SNP passed control checks.
 194        """
 195        return self.__variants_filter_pass
 196
 197    @variants_filter_pass.setter
 198    def variants_filter_pass(self, x: np.ndarray):
 199        """
 200        Update `variants_filter_pass`.
 201        """
 202        self.__variants_filter_pass = x
 203
 204    @property
 205    def variants_id(self) -> Optional[np.ndarray]:
 206        """
 207        Retrieve `variants_id`.
 208
 209        Returns:
 210            **array of shape (n_snps,):** An array containing unique identifiers (IDs) for each SNP.
 211        """
 212        return self.__variants_id
 213
 214    @variants_id.setter
 215    def variants_id(self, x: np.ndarray):
 216        """
 217        Update `variants_id`.
 218        """
 219        self.__variants_id = x
 220
 221    @property
 222    def variants_pos(self) -> Optional[np.ndarray]:
 223        """
 224        Retrieve `variants_pos`.
 225
 226        Returns:
 227            **array of shape (n_snps,):** An array containing the chromosomal positions for each SNP.
 228        """
 229        return self.__variants_pos
 230
 231    @variants_pos.setter
 232    def variants_pos(self, x: np.ndarray):
 233        """
 234        Update `variants_pos`.
 235        """
 236        self.__variants_pos = x
 237
 238    @property
 239    def variants_qual(self) -> Optional[np.ndarray]:
 240        """
 241        Retrieve `variants_qual`.
 242
 243        Returns:
 244            **array of shape (n_snps,):** An array containing the Phred-scaled quality score for each SNP.
 245        """
 246        return self.__variants_qual
 247
 248    @variants_qual.setter
 249    def variants_qual(self, x: np.ndarray):
 250        """
 251        Update `variants_qual`.
 252        """
 253        self.__variants_qual = x
 254
 255    @property
 256    def calldata_lai(self) -> Optional[np.ndarray]:
 257        """
 258        Retrieve `calldata_lai`.
 259
 260        Returns:
 261            **array:** 
 262                An array containing the ancestry for each SNP. This array can be either 2D with shape
 263                `(n_snps, n_samples*2)`, or 3D with shape (n_snps, n_samples, 2).
 264        """
 265        return self.__calldata_lai
 266
 267    @calldata_lai.setter
 268    def calldata_lai(self, x: np.ndarray):
 269        """
 270        Update `calldata_lai`.
 271        """
 272        self.__calldata_lai = x
 273
 274    @property
 275    def ancestry_map(self) -> Optional[Dict[str, str]]:
 276        """
 277        Retrieve `ancestry_map`.
 278
 279        Returns:
 280            **dict of str to str:** A dictionary mapping ancestry codes to region names.
 281        """
 282        return self.__ancestry_map
 283
 284    @ancestry_map.setter
 285    def ancestry_map(self, x):
 286        """
 287        Update `ancestry_map`.
 288        """
 289        self.__ancestry_map = x
 290
 291    @property
 292    def n_samples(self) -> int:
 293        """
 294        Retrieve `n_samples`.
 295
 296        Returns:
 297            **int:** The total number of samples.
 298        """
 299        if self.__samples is not None:
 300            return len(self.__samples)
 301        elif self.__calldata_gt is not None:
 302            return self.__calldata_gt.shape[1]
 303        elif self.__calldata_lai is not None:
 304            if self.__calldata_lai.ndim == 2:
 305                return self.__calldata_lai.shape[1] // 2
 306            elif self.__calldata_lai.ndim == 3:
 307                return self.__calldata_lai.shape[1]
 308        else:
 309            raise ValueError("Unable to determine the total number of samples: no relevant data is available.")
 310
 311    @property
 312    def n_snps(self) -> int:
 313        """
 314        Retrieve `n_snps`.
 315
 316        Returns:
 317            **int:** The total number of SNPs.
 318        """
 319        # List of attributes that can indicate the number of SNPs
 320        potential_attributes = [
 321            self.__calldata_gt,
 322            self.__variants_ref,
 323            self.__variants_alt,
 324            self.__variants_chrom,
 325            self.__variants_filter_pass,
 326            self.__variants_id,
 327            self.__variants_pos,
 328            self.__variants_qual,
 329            self.__calldata_lai
 330        ]
 331
 332        # Check each attribute for its first dimension, which corresponds to `n_snps`
 333        for attr in potential_attributes:
 334            if attr is not None:
 335                return attr.shape[0]
 336
 337        raise ValueError("Unable to determine the total number of SNPs: no relevant data is available.")
 338
 339    @property
 340    def n_chrom(self) -> Optional[int]:
 341        """
 342        Retrieve `n_chrom`.
 343
 344        Returns:
 345            **int:** The total number of unique chromosomes in `variants_chrom`.
 346        """
 347        if self.variants_chrom is None:
 348            warnings.warn("Chromosome data `variants_chrom` is None.")
 349            return None
 350
 351        return len(self.unique_chrom)
 352
 353    @property
 354    def n_ancestries(self) -> int:
 355        """
 356        Retrieve `n_ancestries`.
 357
 358        Returns:
 359            **int:** The total number of unique ancestries.
 360        """
 361        if self.__calldata_lai is not None:
 362            return len(np.unique(self.__calldata_lai))
 363        else:
 364            raise ValueError("Unable to determine the total number of ancestries: no relevant data is available.")
 365
 366    @property
 367    def unique_chrom(self) -> Optional[np.ndarray]:
 368        """
 369        Retrieve `unique_chrom`.
 370
 371        Returns:
 372            **array:** The unique chromosome names in `variants_chrom`, preserving their order of appearance.
 373        """
 374        if self.variants_chrom is None:
 375            warnings.warn("Chromosome data `variants_chrom` is None.")
 376            return None
 377
 378        # Identify unique chromosome names and their first indexes of occurrence
 379        _, idx = np.unique(self.variants_chrom, return_index=True)
 380        # Return chromosome names sorted by their first occurrence to maintain original order
 381        return self.variants_chrom[np.sort(idx)]
 382
 383    @property
 384    def are_strands_summed(self) -> bool:
 385        """
 386        Retrieve `are_strands_summed`.
 387        
 388        Returns:
 389            **bool:** 
 390                True if the maternal and paternal strands have been summed together, which is indicated by 
 391                `calldata_gt` having shape `(n_samples, n_snps)`. False if the strands are stored separately, 
 392                indicated by `calldata_gt` having shape `(n_samples, n_snps, 2)`.
 393        """
 394        if self.calldata_gt is None:
 395            warnings.warn("Genotype data `calldata_gt` is None.")
 396            return None
 397        
 398        return self.calldata_gt.ndim == 2
 399
 400    def copy(self) -> 'SNPObject':
 401        """
 402        Create and return a copy of `self`.
 403
 404        Returns:
 405            **SNPObject:** 
 406                A new instance of the current object.
 407        """
 408        return copy.deepcopy(self)
 409
 410    def keys(self) -> List[str]:
 411        """
 412        Retrieve a list of public attribute names for `self`.
 413
 414        Returns:
 415            **list of str:** 
 416                A list of attribute names, with internal name-mangling removed, 
 417                for easier reference to public attributes in the instance.
 418        """
 419        return [attr.replace('_SNPObject__', '') for attr in vars(self)]
 420
 421    def sum_strands(self, inplace: bool = False) -> Optional['SNPObject']:
 422        """
 423        Sum paternal and maternal strands.
 424
 425        Args:
 426            inplace (bool, default=False): 
 427                If True, modifies `self` in place. If False, returns a new `SNPObject` with the variants 
 428                filtered. Default is False.
 429
 430        Returns:
 431            **Optional[SNPObject]:** 
 432                A new `SNPObject` with summed strands if `inplace=False`. 
 433                If `inplace=True`, modifies `self` in place and returns None.
 434        """
 435        if self.calldata_gt is None:
 436            warnings.warn("Genotype data `calldata_gt` is None.")
 437            return None if not inplace else self
 438
 439        if self.are_strands_summed:
 440            warnings.warn("Genotype data `calldata_gt` is already summed.")
 441            return self if inplace else self.copy()
 442        
 443        if inplace:
 444            self.calldata_gt = self.calldata_gt.sum(axis=2, dtype=np.int8)
 445            return self
 446        else:
 447            snpobj = self.copy()
 448            snpobj.calldata_gt = self.calldata_gt.sum(axis=2, dtype=np.int8)
 449            return snpobj
 450
 451    def filter_variants(
 452            self, 
 453            chrom: Optional[Union[str, Sequence[str], np.ndarray, None]] = None, 
 454            pos: Optional[Union[int, Sequence[int], np.ndarray, None]] = None, 
 455            indexes: Optional[Union[int, Sequence[int], np.ndarray, None]] = None, 
 456            include: bool = True, 
 457            inplace: bool = False
 458        ) -> Optional['SNPObject']:
 459        """
 460        Filter variants based on specified chromosome names, variant positions, or variant indexes.
 461
 462        This method updates the `calldata_gt`, `variants_ref`, `variants_alt`, 
 463        `variants_chrom`, `variants_filter_pass`, `variants_id`, `variants_pos`,  
 464        `variants_qual`, and `lai` attributes to include or exclude the specified variants. The filtering 
 465        criteria can be based on chromosome names, variant positions, or indexes. If multiple 
 466        criteria are provided, their union is used for filtering. The order of the variants is preserved.
 467        
 468        Negative indexes are supported and follow 
 469        [NumPy's indexing conventions](https://numpy.org/doc/stable/user/basics.indexing.html).
 470
 471        Args:
 472            chrom (str or array_like of str, optional): 
 473                Chromosome(s) to filter variants by. Can be a single chromosome as a string or a sequence 
 474                of chromosomes. If both `chrom` and `pos` are provided, they must either have matching lengths 
 475                (pairing each chromosome with a position) or `chrom` should be a single value that applies to 
 476                all positions in `pos`. Default is None. 
 477            pos (int or array_like of int, optional): 
 478                Position(s) to filter variants by. Can be a single position as an integer or a sequence of positions. 
 479                If `chrom` is also provided, `pos` should either match `chrom` in length or `chrom` should be a 
 480                single value. Default is None.
 481            indexes (int or array_like of int, optional): 
 482                Index(es) of the variants to include or exclude. Can be a single index or a sequence
 483                of indexes. Negative indexes are supported. Default is None.
 484            include (bool, default=True): 
 485                If True, includes only the specified variants. If False, excludes the specified
 486                variants. Default is True.
 487            inplace (bool, default=False): 
 488                If True, modifies `self` in place. If False, returns a new `SNPObject` with the variants 
 489                filtered. Default is False.
 490
 491        Returns:
 492            **Optional[SNPObject]:** 
 493                A new `SNPObject` with the specified variants filtered if `inplace=False`. 
 494                If `inplace=True`, modifies `self` in place and returns None.
 495        """
 496        if chrom is None and pos is None and indexes is None:
 497            raise ValueError("At least one of 'chrom', 'pos', or 'indexes' must be provided.")
 498
 499        n_snps = self.n_snps
 500
 501        # Convert inputs to arrays for consistency
 502        chrom = np.atleast_1d(chrom) if chrom is not None else None
 503        pos = np.atleast_1d(pos) if pos is not None else None
 504        indexes = np.atleast_1d(indexes) if indexes is not None else None
 505
 506        # Validate chrom and pos lengths if both are provided
 507        if chrom is not None and pos is not None:
 508            if len(chrom) != len(pos) and len(chrom) > 1:
 509                raise ValueError(
 510                    "When both 'chrom' and 'pos' are provided, they must either be of the same length "
 511                    "or 'chrom' must be a single value."
 512                )
 513
 514        # Create a mask for chromosome and position filtering
 515        mask_combined = np.zeros(n_snps, dtype=bool)
 516        if chrom is not None and pos is not None:
 517            if len(chrom) == 1:
 518                # Apply single chromosome to all positions in `pos`
 519                mask_combined = (self['variants_chrom'] == chrom[0]) & np.isin(self['variants_pos'], pos)
 520            else:
 521                # Vectorized pair matching for chrom and pos
 522                query_pairs = np.array(
 523                    list(zip(chrom, pos)),
 524                    dtype=[
 525                        ('chrom', self['variants_chrom'].dtype),
 526                        ('pos', self['variants_pos'].dtype)
 527                    ]
 528                )
 529                data_pairs = np.array(
 530                    list(zip(self['variants_chrom'], self['variants_pos'])),
 531                    dtype=[
 532                        ('chrom', self['variants_chrom'].dtype),
 533                        ('pos', self['variants_pos'].dtype)
 534                    ]
 535                )
 536                mask_combined = np.isin(data_pairs, query_pairs)
 537
 538        elif chrom is not None:
 539            # Only chromosome filtering
 540            mask_combined = np.isin(self['variants_chrom'], chrom)
 541        elif pos is not None:
 542            # Only position filtering
 543            mask_combined = np.isin(self['variants_pos'], pos)
 544
 545        # Create mask based on indexes if provided
 546        if indexes is not None:
 547            # Validate indexes, allowing negative indexes
 548            out_of_bounds_indexes = indexes[(indexes < -n_snps) | (indexes >= n_snps)]
 549            if out_of_bounds_indexes.size > 0:
 550                raise ValueError(f"One or more sample indexes are out of bounds.")
 551
 552            # Handle negative indexes and check for out-of-bounds indexes
 553            adjusted_indexes = np.mod(indexes, n_snps)
 554
 555            # Create mask for specified indexes
 556            mask_indexes = np.zeros(n_snps, dtype=bool)
 557            mask_indexes[adjusted_indexes] = True
 558
 559            # Combine with `chrom` and `pos` mask using logical OR (union of all specified criteria)
 560            mask_combined = mask_combined | mask_indexes
 561
 562        # Invert mask if `include` is False
 563        if not include:
 564            mask_combined = ~mask_combined
 565
 566        # Define keys to filter
 567        keys = [
 568            'calldata_gt', 'variants_ref', 'variants_alt', 'variants_chrom', 'variants_filter_pass', 
 569            'variants_id', 'variants_pos', 'variants_qual', 'calldata_lai'
 570        ]
 571
 572        # Apply filtering based on inplace parameter
 573        if inplace:
 574            for key in keys:
 575                if self[key] is not None:
 576                    if self[key].ndim > 1:
 577                        self[key] = np.asarray(self[key])[mask_combined, ...]
 578                    else:
 579                        self[key] = np.asarray(self[key])[mask_combined]
 580
 581            return None
 582        else:
 583            # Create A new `SNPObject` with filtered data
 584            snpobj = self.copy()
 585            for key in keys:
 586                if snpobj[key] is not None:
 587                    if snpobj[key].ndim > 1:
 588                        snpobj[key] = np.asarray(snpobj[key])[mask_combined, ...]
 589                    else:
 590                        snpobj[key] = np.asarray(snpobj[key])[mask_combined]
 591
 592            return snpobj
 593
 594    def filter_samples(
 595            self, 
 596            samples: Optional[Union[str, Sequence[str], np.ndarray, None]] = None,
 597            indexes: Optional[Union[int, Sequence[int], np.ndarray, None]] = None,
 598            include: bool = True,
 599            reorder: bool = False,
 600            inplace: bool = False
 601        ) -> Optional['SNPObject']:
 602        """
 603        Filter samples based on specified names or indexes.
 604
 605        This method updates the `samples` and `calldata_gt` attributes to include or exclude the specified 
 606        samples. The order of the samples is preserved. Set `reorder=True` to match the ordering of the
 607        provided `samples` and/or `indexes` lists when including.
 608
 609        If both samples and indexes are provided, any sample matching either a name in samples or an index in 
 610        indexes will be included or excluded.
 611
 612        This method allows inclusion or exclusion of specific samples by their names or 
 613        indexes. When both sample names and indexes are provided, the union of the specified samples 
 614        is used. Negative indexes are supported and follow 
 615        [NumPy's indexing conventions](https://numpy.org/doc/stable/user/basics.indexing.html).
 616
 617        Args:
 618            samples (str or array_like of str, optional): 
 619                 Name(s) of the samples to include or exclude. Can be a single sample name or a
 620                 sequence of sample names. Default is None.
 621            indexes (int or array_like of int, optional):
 622                Index(es) of the samples to include or exclude. Can be a single index or a sequence
 623                of indexes. Negative indexes are supported. Default is None.
 624            include (bool, default=True): 
 625                If True, includes only the specified samples. If False, excludes the specified
 626                samples. Default is True.
 627            inplace (bool, default=False): 
 628                If True, modifies `self` in place. If False, returns a new `SNPObject` with the samples 
 629                filtered. Default is False.
 630
 631        Returns:
 632            **Optional[SNPObject]:** 
 633                A new `SNPObject` with the specified samples filtered if `inplace=False`. 
 634                If `inplace=True`, modifies `self` in place and returns None.
 635        """
 636        if samples is None and indexes is None:
 637            raise ValueError("At least one of 'samples' or 'indexes' must be provided.")
 638
 639        n_samples = self.n_samples
 640        sample_names = np.array(self['samples'])
 641
 642        # Create mask based on sample names
 643        if samples is not None:
 644            samples = np.asarray(samples).ravel()
 645            mask_samples = np.isin(sample_names, samples)
 646            missing_samples = samples[~np.isin(samples, sample_names)]
 647            if missing_samples.size > 0:
 648                raise ValueError(f"The following specified samples were not found: {missing_samples.tolist()}")
 649        else:
 650            mask_samples = np.zeros(n_samples, dtype=bool)
 651
 652        # Create mask based on sample indexes
 653        if indexes is not None:
 654            indexes = np.asarray(indexes).ravel()
 655
 656            # Validate indexes, allowing negative indexes
 657            out_of_bounds_indexes = indexes[(indexes < -n_samples) | (indexes >= n_samples)]
 658            if out_of_bounds_indexes.size > 0:
 659                raise ValueError(f"One or more sample indexes are out of bounds.")
 660            
 661            # Handle negative indexes
 662            adjusted_indexes = np.mod(indexes, n_samples)
 663
 664            mask_indexes = np.zeros(n_samples, dtype=bool)
 665            mask_indexes[adjusted_indexes] = True
 666        else:
 667            mask_indexes = np.zeros(n_samples, dtype=bool)
 668
 669        # Combine masks using logical OR (union of samples)
 670        mask_combined = mask_samples | mask_indexes
 671
 672        if not include:
 673            mask_combined = ~mask_combined
 674
 675        # If requested, compute an ordering of selected samples that follows the provided lists.
 676        ordered_indices = None
 677        if include and reorder:
 678            sel_indices = np.where(mask_combined)[0]
 679            ordered_list: List[int] = []
 680            added = np.zeros(n_samples, dtype=bool)
 681
 682            # Prioritize the order in `samples`
 683            if samples is not None:
 684                name_to_idx = {name: idx for idx, name in enumerate(sample_names)}
 685                for s in samples:
 686                    idx = name_to_idx.get(s)
 687                    if idx is not None and mask_combined[idx] and not added[idx]:
 688                        ordered_list.append(idx)
 689                        added[idx] = True
 690
 691            # Then respect the order in `indexes`
 692            if indexes is not None:
 693                adj_idx = np.mod(np.atleast_1d(indexes), n_samples)
 694                for idx in adj_idx:
 695                    if mask_combined[idx] and not added[idx]:
 696                        ordered_list.append(int(idx))
 697                        added[idx] = True
 698
 699            # Finally, append any remaining selected samples in their original order
 700            for idx in sel_indices:
 701                if not added[idx]:
 702                    ordered_list.append(int(idx))
 703
 704            ordered_indices = np.asarray(ordered_list, dtype=int)
 705
 706        # Define keys to filter
 707        keys = ['samples', 'calldata_gt', 'calldata_lai']
 708
 709        # Apply filtering based on inplace parameter
 710        if inplace:
 711            for key in keys:
 712                if self[key] is not None:
 713                    arr = np.asarray(self[key])
 714                    if ordered_indices is not None:
 715                        if key == 'calldata_lai' and arr.ndim == 2:
 716                            # Haplotype-aware reordering for 2D LAI (n_snps, n_samples*2)
 717                            hap_idx = np.concatenate([2*ordered_indices, 2*ordered_indices + 1])
 718                            self[key] = arr[:, hap_idx]
 719                        elif arr.ndim > 1:
 720                            self[key] = arr[:, ordered_indices, ...]
 721                        else:
 722                            self[key] = arr[ordered_indices]
 723                    else:
 724                        if arr.ndim > 1:
 725                            self[key] = arr[:, mask_combined, ...]
 726                        else:
 727                            self[key] = arr[mask_combined]
 728
 729            return None
 730        else:
 731            # Create A new `SNPObject` with filtered data
 732            snpobj = self.copy()
 733            for key in keys:
 734                if snpobj[key] is not None:
 735                    arr = np.asarray(snpobj[key])
 736                    if ordered_indices is not None:
 737                        if key == 'calldata_lai' and arr.ndim == 2:
 738                            hap_idx = np.concatenate([2*ordered_indices, 2*ordered_indices + 1])
 739                            snpobj[key] = arr[:, hap_idx]
 740                        elif arr.ndim > 1:
 741                            snpobj[key] = arr[:, ordered_indices, ...]
 742                        else:
 743                            snpobj[key] = arr[ordered_indices]
 744                    else:
 745                        if arr.ndim > 1:
 746                            snpobj[key] = arr[:, mask_combined, ...]
 747                        else:
 748                            snpobj[key] = arr[mask_combined]
 749            return snpobj
 750
 751    def detect_chromosome_format(self) -> str:
 752        """
 753        Detect the chromosome naming convention in `variants_chrom` based on the prefix 
 754        of the first chromosome identifier in `unique_chrom`.
 755        
 756        **Recognized formats:**
 757
 758        - `'chr'`: Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'.
 759        - `'chm'`: Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'.
 760        - `'chrom'`: Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'.
 761        - `'plain'`: Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'.
 762        
 763        If the format does not match any recognized pattern, `'Unknown format'` is returned.
 764
 765        Returns:
 766            **str:** 
 767                A string indicating the detected chromosome format (`'chr'`, `'chm'`, `'chrom'`, or `'plain'`).
 768                If no recognized format is matched, returns `'Unknown format'`.
 769        """
 770        # Select the first unique chromosome identifier for format detection
 771        chromosome_str = self.unique_chrom[0]
 772
 773        # Define regular expressions to match each recognized chromosome format
 774        patterns = {
 775            'chr': r'^chr(\d+|X|Y|M)$',    # Matches 'chr' prefixed format
 776            'chm': r'^chm(\d+|X|Y|M)$',    # Matches 'chm' prefixed format
 777            'chrom': r'^chrom(\d+|X|Y|M)$', # Matches 'chrom' prefixed format
 778            'plain': r'^(\d+|X|Y|M)$'       # Matches plain format without prefix
 779        }
 780
 781        # Iterate through the patterns to identify the chromosome format
 782        for prefix, pattern in patterns.items():
 783            if re.match(pattern, chromosome_str):
 784                return prefix  # Return the recognized format prefix
 785
 786        # If no pattern matches, return 'Unknown format'
 787        return 'Unknown format'
 788
 789    def convert_chromosome_format(
 790        self, 
 791        from_format: str, 
 792        to_format: str, 
 793        inplace: bool = False
 794    ) -> Optional['SNPObject']:
 795        """
 796        Convert the chromosome format from one naming convention to another in `variants_chrom`.
 797
 798        **Supported formats:**
 799
 800        - `'chr'`: Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'.
 801        - `'chm'`: Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'.
 802        - `'chrom'`: Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'.
 803        - `'plain'`: Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'.
 804
 805        Args:
 806            from_format (str): 
 807                The current chromosome format. Acceptable values are `'chr'`, `'chm'`, `'chrom'`, or `'plain'`.
 808            to_format (str): 
 809                The target format for chromosome data conversion. Acceptable values match `from_format` options.
 810            inplace (bool, default=False): 
 811                If True, modifies `self` in place. If False, returns a new `SNPObject` with the converted format.
 812                Default is False.
 813
 814        Returns:
 815            **Optional[SNPObject]:** A new `SNPObject` with the converted chromosome format if `inplace=False`. 
 816            If `inplace=True`, modifies `self` in place and returns None.
 817        """
 818        # Define the list of standard chromosome identifiers
 819        chrom_list = [*map(str, range(1, 23)), 'X', 'Y', 'M']  # M for mitochondrial chromosomes
 820
 821        # Format mappings for different chromosome naming conventions
 822        format_mappings = {
 823            'chr': [f'chr{i}' for i in chrom_list],
 824            'chm': [f'chm{i}' for i in chrom_list],
 825            'chrom': [f'chrom{i}' for i in chrom_list],
 826            'plain': chrom_list,
 827        }
 828
 829        # Verify that from_format and to_format are valid naming conventions
 830        if from_format not in format_mappings or to_format not in format_mappings:
 831            raise ValueError(f"Invalid format: {from_format} or {to_format}. Must be one of {list(format_mappings.keys())}.")
 832
 833        # Convert chromosomes to string for consistent comparison
 834        variants_chrom = self['variants_chrom'].astype(str)
 835
 836        # Verify that all chromosomes in the object follow the specified `from_format`
 837        expected_chroms = set(format_mappings[from_format])
 838        mismatched_chroms = set(variants_chrom) - expected_chroms
 839
 840        if mismatched_chroms:
 841            raise ValueError(f"The following chromosomes do not match the `from_format` '{from_format}': {mismatched_chroms}.")
 842
 843        # Create conditions for selecting based on current `from_format` names
 844        conditions = [variants_chrom == chrom for chrom in format_mappings[from_format]]
 845
 846        # Rename chromosomes based on inplace flag
 847        if inplace:
 848            self['variants_chrom'] = np.select(conditions, format_mappings[to_format], default='unknown')
 849            return None
 850        else:
 851            snpobject = self.copy()
 852            snpobject['variants_chrom'] = np.select(conditions, format_mappings[to_format], default='unknown')
 853            return snpobject
 854
 855    def match_chromosome_format(self, snpobj: 'SNPObject', inplace: bool = False) -> Optional['SNPObject']:
 856        """
 857        Convert the chromosome format in `variants_chrom` from `self` to match the format of a reference `snpobj`.
 858
 859        **Recognized formats:**
 860
 861        - `'chr'`: Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'.
 862        - `'chm'`: Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'.
 863        - `'chrom'`: Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'.
 864        - `'plain'`: Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'.
 865
 866        Args:
 867            snpobj (SNPObject): 
 868                The reference SNPObject whose chromosome format will be matched.
 869            inplace (bool, default=False): 
 870                If True, modifies `self` in place. If False, returns a new `SNPObject` with the 
 871                chromosome format matching that of `snpobj`. Default is False.
 872
 873        Returns:
 874            **Optional[SNPObject]:** 
 875                A new `SNPObject` with matched chromosome format if `inplace=False`. 
 876                If `inplace=True`, modifies `self` in place and returns None.
 877        """
 878        # Detect the chromosome naming format of the current SNPObject
 879        fmt1 = self.detect_chromosome_format()
 880        if fmt1 == 'Unknown format':
 881            raise ValueError("The chromosome format of the current SNPObject is unrecognized.")
 882        
 883        # Detect the chromosome naming format of the reference SNPObject
 884        fmt2 = snpobj.detect_chromosome_format()
 885        if fmt2 == 'Unknown format':
 886            raise ValueError("The chromosome format of the reference SNPObject is unrecognized.")
 887
 888        # Convert the current SNPObject's chromosome format to match the reference format
 889        return self.convert_chromosome_format(fmt1, fmt2, inplace=inplace)
 890
 891    def rename_chrom(
 892        self,
 893        to_replace: Union[Dict[str, str], str, List[str]] = {'^([0-9]+)$': r'chr\1', r'^chr([0-9]+)$': r'\1'},
 894        value: Optional[Union[str, List[str]]] = None,
 895        regex: bool = True,
 896        inplace: bool = False
 897    ) -> Optional['SNPObject']:
 898        """
 899        Replace chromosome values in `variants_chrom` using patterns or exact matches.
 900
 901        This method allows flexible chromosome replacements, using regex or exact matches, useful 
 902        for non-standard chromosome formats. For standard conversions (e.g., 'chr1' to '1'), 
 903        consider `convert_chromosome_format`.
 904
 905        Args:
 906            to_replace (dict, str, or list of str): 
 907                Pattern(s) or exact value(s) to be replaced in chromosome names. Default behavior 
 908                transforms `<chrom_num>` to `chr<chrom_num>` or vice versa. Non-matching values 
 909                remain unchanged.
 910                - If str or list of str: Matches will be replaced with `value`.
 911                - If regex (bool), then any regex matches will be replaced with `value`.
 912                - If dict: Keys defines values to replace, with corresponding replacements as values.
 913            value (str or list of str, optional): 
 914                Replacement value(s) if `to_replace` is a string or list. Ignored if `to_replace` 
 915                is a dictionary.
 916            regex (bool, default=True): 
 917                If True, interprets `to_replace` keys as regex patterns.
 918            inplace (bool, default=False): 
 919                If True, modifies `self` in place. If False, returns a new `SNPObject` with the chromosomes
 920                renamed. Default is False.
 921
 922        Returns:
 923            **Optional[SNPObject]:** A new `SNPObject` with the renamed chromosome format if `inplace=False`. 
 924            If `inplace=True`, modifies `self` in place and returns None.
 925        """
 926        # Standardize input format: convert `to_replace` and `value` to a dictionary if needed
 927        if isinstance(to_replace, (str, int)):
 928            to_replace = [to_replace]
 929        if isinstance(value, (str, int)):
 930            value = [value]
 931        if isinstance(to_replace, list) and isinstance(value, list):
 932            dictionary = dict(zip(to_replace, value))
 933        elif isinstance(to_replace, dict) and value is None:
 934            dictionary = to_replace
 935        else:
 936            raise ValueError(
 937            "Invalid input: `to_replace` and `value` must be compatible types (both str, list of str, or dict)."
 938        )
 939
 940        # Vectorized function for replacing values in chromosome array
 941        vec_replace_values = np.vectorize(self._match_to_replace)
 942
 943        # Rename chromosomes based on inplace flag
 944        if inplace:
 945            self.variants_chrom = vec_replace_values(self.variants_chrom, dictionary, regex)
 946            return None
 947        else:
 948            snpobj = self.copy()
 949            snpobj.variants_chrom = vec_replace_values(self.variants_chrom, dictionary, regex)
 950            return snpobj
 951
 952    def rename_missings(
 953        self, 
 954        before: Union[int, float, str] = -1, 
 955        after: Union[int, float, str] = '.', 
 956        inplace: bool = False
 957    ) -> Optional['SNPObject']:
 958        """
 959        Replace missing values in the `calldata_gt` attribute.
 960
 961        This method identifies missing values in 'calldata_gt' and replaces them with a specified 
 962        value. By default, it replaces occurrences of `-1` (often used to signify missing data) with `'.'`.
 963
 964        Args:
 965            before (int, float, or str, default=-1): 
 966                The current representation of missing values in `calldata_gt`. Common values might be -1, '.', or NaN.
 967                Default is -1.
 968            after (int, float, or str, default='.'): 
 969                The value that will replace `before`. Default is '.'.
 970            inplace (bool, default=False): 
 971                If True, modifies `self` in place. If False, returns a new `SNPObject` with the applied 
 972                replacements. Default is False.
 973
 974        Returns:
 975            **Optional[SNPObject]:** 
 976                A new `SNPObject` with the renamed missing values if `inplace=False`. 
 977                If `inplace=True`, modifies `self` in place and returns None.
 978        """
 979        # Rename missing values in the `calldata_gt` attribute based on inplace flag
 980        if inplace:
 981            self['calldata_gt'] = np.where(self['calldata_gt'] == before, after, self['calldata_gt'])
 982            return None
 983        else:
 984            snpobj = self.copy()
 985            snpobj['calldata_gt'] = np.where(snpobj['calldata_gt'] == before, after, snpobj['calldata_gt'])
 986            return snpobj
 987
 988    def get_common_variants_intersection(
 989        self, 
 990        snpobj: 'SNPObject', 
 991        index_by: str = 'pos'
 992    ) -> Tuple[List[str], np.ndarray, np.ndarray]:
 993        """
 994        Identify common variants between `self` and the `snpobj` instance based on the specified `index_by` criterion, 
 995        which may match based on chromosome and position (`variants_chrom`, `variants_pos`), ID (`variants_id`), or both.
 996
 997        This method returns the identifiers of common variants and their corresponding indices in both objects.
 998
 999        Args:
1000            snpobj (SNPObject): 
1001                The reference SNPObject to compare against.
1002            index_by (str, default='pos'): 
1003                Criteria for matching variants. Options:
1004                - `'pos'`: Matches by chromosome and position (`variants_chrom`, `variants_pos`), e.g., 'chr1-12345'.
1005                - `'id'`: Matches by variant ID alone (`variants_id`), e.g., 'rs123'.
1006                - `'pos+id'`: Matches by chromosome, position, and ID (`variants_chrom`, `variants_pos`, `variants_id`), e.g., 'chr1-12345-rs123'.
1007                Default is 'pos'.
1008
1009        Returns:
1010            Tuple containing:
1011            - **list of str:** A list of common variant identifiers (as strings).
1012            - **array:** An array of indices in `self` where common variants are located.
1013            - **array:** An array of indices in `snpobj` where common variants are located.
1014        """
1015        # Create unique identifiers for each variant in both SNPObjects based on the specified criterion
1016        if index_by == 'pos':
1017            query_identifiers = [f"{chrom}-{pos}" for chrom, pos in zip(self['variants_chrom'], self['variants_pos'])]
1018            reference_identifiers = [f"{chrom}-{pos}" for chrom, pos in zip(snpobj['variants_chrom'], snpobj['variants_pos'])]
1019        elif index_by == 'id':
1020            query_identifiers = self['variants_id'].tolist()
1021            reference_identifiers = snpobj['variants_id'].tolist()
1022        elif index_by == 'pos+id':
1023            query_identifiers = [
1024                f"{chrom}-{pos}-{ids}" for chrom, pos, ids in zip(self['variants_chrom'], self['variants_pos'], self['variants_id'])
1025            ]
1026            reference_identifiers = [
1027                f"{chrom}-{pos}-{ids}" for chrom, pos, ids in zip(snpobj['variants_chrom'], snpobj['variants_pos'], snpobj['variants_id'])
1028            ]
1029        else:
1030            raise ValueError("`index_by` must be one of 'pos', 'id', or 'pos+id'.")
1031
1032        # Convert to sets for intersection
1033        common_ids = set(query_identifiers).intersection(reference_identifiers)
1034
1035        # Collect indices for common identifiers
1036        query_idx = [i for i, id in enumerate(query_identifiers) if id in common_ids]
1037        reference_idx = [i for i, id in enumerate(reference_identifiers) if id in common_ids]
1038
1039        return list(common_ids), np.array(query_idx), np.array(reference_idx)
1040
1041    def get_common_markers_intersection(
1042        self, 
1043        snpobj: 'SNPObject'
1044    ) -> Tuple[List[str], np.ndarray, np.ndarray]:
1045        """
1046        Identify common markers between between `self` and the `snpobj` instance. Common markers are identified 
1047        based on matching chromosome (`variants_chrom`), position (`variants_pos`), reference (`variants_ref`), 
1048        and alternate (`variants_alt`) alleles.
1049
1050        This method returns the identifiers of common markers and their corresponding indices in both objects.
1051        
1052        Args:
1053            snpobj (SNPObject): 
1054                The reference SNPObject to compare against.
1055        
1056        Returns:
1057            Tuple containing:
1058            - **list of str:** A list of common variant identifiers (as strings).
1059            - **array:** An array of indices in `self` where common variants are located.
1060            - **array:** An array of indices in `snpobj` where common variants are located.
1061        """
1062        # Generate unique identifiers based on chrom, pos, ref, and alt alleles
1063        query_identifiers = [
1064            f"{chrom}-{pos}-{ref}-{alt}" for chrom, pos, ref, alt in 
1065            zip(self['variants_chrom'], self['variants_pos'], self['variants_ref'], self['variants_alt'])
1066        ]
1067        reference_identifiers = [
1068            f"{chrom}-{pos}-{ref}-{alt}" for chrom, pos, ref, alt in 
1069            zip(snpobj['variants_chrom'], snpobj['variants_pos'], snpobj['variants_ref'], snpobj['variants_alt'])
1070        ]
1071
1072        # Convert to sets for intersection
1073        common_ids = set(query_identifiers).intersection(reference_identifiers)
1074
1075        # Collect indices for common identifiers in both SNPObjects
1076        query_idx = [i for i, id in enumerate(query_identifiers) if id in common_ids]
1077        reference_idx = [i for i, id in enumerate(reference_identifiers) if id in common_ids]
1078
1079        return list(common_ids), np.array(query_idx), np.array(reference_idx)
1080
1081    def subset_to_common_variants(
1082        self, 
1083        snpobj: 'SNPObject', 
1084        index_by: str = 'pos', 
1085        common_variants_intersection: Optional[Tuple[np.ndarray, np.ndarray]] = None,
1086        inplace: bool = False
1087    ) -> Optional['SNPObject']:
1088        """
1089        Subset `self` to include only the common variants with a reference `snpobj` based on 
1090        the specified `index_by` criterion, which may match based on chromosome and position 
1091        (`variants_chrom`, `variants_pos`), ID (`variants_id`), or both.
1092        
1093        Args:
1094            snpobj (SNPObject): 
1095                The reference SNPObject to compare against.
1096            index_by (str, default='pos'): 
1097                Criteria for matching variants. Options:
1098                - `'pos'`: Matches by chromosome and position (`variants_chrom`, `variants_pos`), e.g., 'chr1-12345'.
1099                - `'id'`: Matches by variant ID alone (`variants_id`), e.g., 'rs123'.
1100                - `'pos+id'`: Matches by chromosome, position, and ID (`variants_chrom`, `variants_pos`, `variants_id`), e.g., 'chr1-12345-rs123'.
1101                Default is 'pos'.
1102            common_variants_intersection (Tuple[np.ndarray, np.ndarray], optional): 
1103                Precomputed indices of common variants between `self` and `snpobj`. If None, intersection is 
1104                computed within the function.
1105            inplace (bool, default=False): 
1106                If True, modifies `self` in place. If False, returns a new `SNPObject` with the common variants
1107                subsetted. Default is False.
1108
1109        Returns:
1110            **Optional[SNPObject]:** 
1111                A new `SNPObject` with the common variants subsetted if `inplace=False`. 
1112                If `inplace=True`, modifies `self` in place and returns None.
1113        """
1114        # Get indices of common variants if not provided
1115        if common_variants_intersection is None:
1116            _, query_idx, _ = self.get_common_variants_intersection(snpobj, index_by=index_by)
1117        else:
1118            query_idx, _ = common_variants_intersection
1119
1120        # Use filter_variants method with the identified indices, applying `inplace` as specified
1121        return self.filter_variants(indexes=query_idx, include=True, inplace=inplace)
1122
1123    def subset_to_common_markers(
1124        self, 
1125        snpobj: 'SNPObject', 
1126        common_markers_intersection: Optional[Tuple[np.ndarray, np.ndarray]] = None,
1127        inplace: bool = False
1128    ) -> Optional['SNPObject']:
1129        """
1130        Subset `self` to include only the common markers with a reference `snpobj`. Common markers are identified 
1131        based on matching chromosome (`variants_chrom`), position (`variants_pos`), reference (`variants_ref`), 
1132        and alternate (`variants_alt`) alleles.
1133
1134        Args:
1135            snpobj (SNPObject): 
1136                The reference SNPObject to compare against.
1137            common_markers_intersection (tuple of arrays, optional): 
1138                Precomputed indices of common markers between `self` and `snpobj`. If None, intersection is 
1139                computed within the function.
1140            inplace (bool, default=False): 
1141                If True, modifies `self` in place. If False, returns a new `SNPObject` with the common markers
1142                subsetted. Default is False.
1143
1144        Returns:
1145            **Optional[SNPObject]:** 
1146                A new `SNPObject` with the common markers subsetted if `inplace=False`. 
1147                If `inplace=True`, modifies `self` in place and returns None.
1148        """
1149        # Get indices of common markers if not provided
1150        if common_markers_intersection is None:
1151            _, query_idx, _ = self.get_common_markers_intersection(snpobj)
1152        else:
1153            query_idx, _ = common_markers_intersection
1154
1155        # Use filter_variants method with the identified indices, applying `inplace` as specified
1156        return self.filter_variants(indexes=query_idx, include=True, inplace=inplace)
1157
1158    def merge(
1159            self, 
1160            snpobj: 'SNPObject', 
1161            force_samples: bool = False, 
1162            prefix: str = '2', 
1163            inplace: bool = False
1164        ) -> Optional['SNPObject']:
1165        """
1166        Merge `self` with `snpobj` along the sample axis.
1167
1168        This method expects both SNPObjects to contain the same set of SNPs in the same order, 
1169        then combines their genotype (`calldata_gt`) and LAI (`calldata_lai`) arrays by 
1170        concatenating the sample dimension. Samples from `snpobj` are appended to those in `self`.
1171
1172        Args:
1173            snpobj (SNPObject): 
1174                The SNPObject to merge samples with.
1175            force_samples (bool, default=False): 
1176                If True, duplicate sample names are resolved by prepending the `prefix` to duplicate sample names in 
1177                `snpobj`. Otherwise, merging fails when duplicate sample names are found. Default is False.
1178            prefix (str, default='2'): 
1179                A string prepended to duplicate sample names in `snpobj` when `force_samples=True`. 
1180                Duplicates are renamed from `<sample_name>` to `<prefix>:<sample_name>`. For instance, 
1181                if `prefix='2'` and there is a conflict with a sample called "sample_1", it becomes "2:sample_1".
1182            inplace (bool, default=False): 
1183                If True, modifies `self` in place. If False, returns a new `SNPObject` with the merged samples. 
1184                Default is False.
1185
1186        Returns:
1187            **Optional[SNPObject]**: A new SNPObject containing the merged sample data.
1188        """
1189        # Merge calldata_gt if present and compatible
1190        if self.calldata_gt is not None and snpobj.calldata_gt is not None:
1191            if self.calldata_gt.shape[0] != snpobj.calldata_gt.shape[0]:
1192                raise ValueError(
1193                    f"Cannot merge SNPObjects: Mismatch in the number of SNPs in `calldata_gt`.\n"
1194                    f"`self.calldata_gt` has {self.calldata_gt.shape[0]} SNPs, "
1195                    f"while `snpobj.calldata_gt` has {snpobj.calldata_gt.shape[0]} SNPs."
1196                )
1197            if self.are_strands_summed and not snpobj.are_strands_summed:
1198                raise ValueError(
1199                    "Cannot merge SNPObjects: `self` has summed strands, but `snpobj` does not.\n"
1200                    "Ensure both objects have the same genotype summation state before merging."
1201                )
1202            if not self.are_strands_summed and snpobj.are_strands_summed:
1203                raise ValueError(
1204                    "Cannot merge SNPObjects: `snpobj` has summed strands, but `self` does not.\n"
1205                    "Ensure both objects have the same genotype summation state before merging."
1206                )
1207            calldata_gt = np.concatenate([self.calldata_gt, snpobj.calldata_gt], axis=1)
1208        else:
1209            calldata_gt = None
1210
1211        # Merge samples if present and compatible, handling duplicates if `force_samples=True`
1212        if self.samples is not None and snpobj.samples is not None:
1213            overlapping_samples = set(self.samples).intersection(set(snpobj.samples))
1214            if overlapping_samples:
1215                if not force_samples:
1216                    raise ValueError(
1217                        f"Cannot merge SNPObjects: Found overlapping sample names {overlapping_samples}.\n"
1218                        "Samples must be strictly non-overlapping. To allow merging with renaming, set `force_samples=True`."
1219                    )
1220                else:
1221                    # Rename duplicate samples by prepending the file index
1222                    renamed_samples = [f"{prefix}:{sample}" if sample in overlapping_samples else sample for sample in snpobj.samples]
1223                    samples = np.concatenate([self.samples, renamed_samples], axis=0)
1224            else:
1225                samples = np.concatenate([self.samples, snpobj.samples], axis=0)
1226        else:
1227            samples = None
1228
1229        # Merge LAI data if present and compatible
1230        if self.calldata_lai is not None and snpobj.calldata_lai is not None:
1231            if self.calldata_lai.ndim != snpobj.calldata_lai.ndim:
1232                raise ValueError(
1233                    f"Cannot merge SNPObjects: Mismatch in `calldata_lai` dimensions.\n"
1234                    f"`self.calldata_lai` has {self.calldata_lai.ndim} dimensions, "
1235                    f"while `snpobj.calldata_lai` has {snpobj.calldata_lai.ndim} dimensions."
1236                )
1237            if self.calldata_lai.shape[0] != snpobj.calldata_lai.shape[0]:
1238                raise ValueError(
1239                    f"Cannot merge SNPObjects: Mismatch in the number of SNPs in `calldata_lai`.\n"
1240                    f"`self.calldata_lai` has {self.calldata_lai.shape[0]} SNPs, "
1241                    f"while `snpobj.calldata_lai` has {snpobj.calldata_lai.shape[0]} SNPs."
1242                )
1243            calldata_lai = np.concatenate([self.calldata_lai, snpobj.calldata_lai], axis=1)
1244        else:
1245            calldata_lai = None
1246
1247        if inplace:
1248            self.calldata_gt = calldata_gt
1249            self.calldata_lai = calldata_lai
1250            self.samples = samples
1251            return self
1252
1253        # Create and return a new SNPObject containing the merged samples
1254        return SNPObject(
1255            calldata_gt=calldata_gt,
1256            samples=samples,
1257            variants_ref=self.variants_ref,
1258            variants_alt=self.variants_alt,
1259            variants_chrom=self.variants_chrom,
1260            variants_filter_pass=self.variants_filter_pass,
1261            variants_id=self.variants_id,
1262            variants_pos=self.variants_pos,
1263            variants_qual=self.variants_qual,
1264            calldata_lai=calldata_lai,
1265            ancestry_map=self.ancestry_map
1266        )
1267    
1268    def concat(
1269        self,
1270        snpobj: 'SNPObject', 
1271        inplace: bool = False
1272    ) -> Optional['SNPObject']:
1273        """
1274        Concatenate self with snpobj along the SNP axis.
1275
1276        This method expects both SNPObjects to contain the same set of samples in the same order, 
1277        and that the chromosome(s) in snpobj follow (i.e. have higher numeric identifiers than) 
1278        those in self.
1279
1280        Args:
1281            snpobj (SNPObject):
1282                The SNPObject to concatenate SNPs with.
1283            inplace (bool, default=False):
1284                If True, modifies `self` in place. If False, returns a new `SNPObject` with the concatenated SNPs. 
1285                Default is False.
1286        
1287        Returns:
1288            **Optional[SNPObject]**: A new SNPObject containing the concatenated SNP data.
1289        """
1290        # Merge calldata_gt if present and compatible
1291        if self.calldata_gt is not None and snpobj.calldata_gt is not None:
1292            if self.calldata_gt.shape[1] != snpobj.calldata_gt.shape[1]:
1293                raise ValueError(
1294                    f"Cannot merge SNPObjects: Mismatch in the number of samples in `calldata_gt`.\n"
1295                    f"`self.calldata_gt` has {self.calldata_gt.shape[1]} samples, "
1296                    f"while `snpobj.calldata_gt` has {snpobj.calldata_gt.shape[1]} samples."
1297                )
1298            if self.are_strands_summed and not snpobj.are_strands_summed:
1299                raise ValueError(
1300                    "Cannot merge SNPObjects: `self` has summed strands, but `snpobj` does not.\n"
1301                    "Ensure both objects have the same genotype summation state before merging."
1302                )
1303            if not self.are_strands_summed and snpobj.are_strands_summed:
1304                raise ValueError(
1305                    "Cannot merge SNPObjects: `snpobj` has summed strands, but `self` does not.\n"
1306                    "Ensure both objects have the same genotype summation state before merging."
1307                )
1308            calldata_gt = np.concatenate([self.calldata_gt, snpobj.calldata_gt], axis=0)
1309        else:
1310            calldata_gt = None
1311
1312        # Merge SNP-related attributes if present
1313        attributes = [
1314            'variants_ref', 'variants_alt', 'variants_chrom', 'variants_filter_pass', 'variants_id', 'variants_pos', 'variants_qual'
1315        ]
1316        merged_attrs = {}
1317        for attr in attributes:
1318            self_attr = getattr(self, attr, None)
1319            obj_attr = getattr(snpobj, attr, None)
1320
1321            # Concatenate if both present
1322            if self_attr is not None and obj_attr is not None:
1323                merged_attrs[attr] = np.concatenate([self_attr, obj_attr], axis=0)
1324            else:
1325                # If either is None, store None
1326                merged_attrs[attr] = None
1327
1328        # Merge LAI data if present and compatible
1329        if self.calldata_lai is not None and snpobj.calldata_lai is not None:
1330            if self.calldata_lai.ndim != snpobj.calldata_lai.ndim:
1331                raise ValueError(
1332                    f"Cannot merge SNPObjects: Mismatch in `calldata_lai` dimensions.\n"
1333                    f"`self.calldata_lai` has {self.calldata_lai.ndim} dimensions, "
1334                    f"while `snpobj.calldata_lai` has {snpobj.calldata_lai.ndim} dimensions."
1335                )
1336            if self.calldata_lai.shape[1] != snpobj.calldata_lai.shape[1]:
1337                raise ValueError(
1338                    f"Cannot merge SNPObjects: Mismatch in the number of samples in `calldata_lai`.\n"
1339                    f"`self.calldata_lai` has {self.calldata_lai.shape[1]} samples, "
1340                    f"while `snpobj.calldata_lai` has {snpobj.calldata_lai.shape[1]} samples."
1341                )
1342            calldata_lai = np.concatenate([self.calldata_lai, snpobj.calldata_lai], axis=0)
1343        else:
1344            calldata_lai = None
1345        
1346        if inplace:
1347            self.calldata_gt = calldata_gt
1348            self.calldata_lai = calldata_lai
1349            for attr in attributes:
1350                self[attr] = merged_attrs[attr]
1351            return self
1352        
1353        # Create and return a new SNPObject containing the concatenated SNPs
1354        return SNPObject(
1355            calldata_gt=calldata_gt,
1356            calldata_lai=calldata_lai,
1357            samples=self.samples,
1358            variants_ref=merged_attrs['variants_ref'],
1359            variants_alt=merged_attrs['variants_alt'],
1360            variants_chrom=merged_attrs['variants_chrom'],
1361            variants_id=merged_attrs['variants_id'],
1362            variants_pos=merged_attrs['variants_pos'],
1363            variants_qual=merged_attrs['variants_qual'],
1364            variants_filter_pass=merged_attrs['variants_filter_pass'],
1365            ancestry_map=self.ancestry_map
1366        )
1367
1368    def remove_strand_ambiguous_variants(self, inplace: bool = False) -> Optional['SNPObject']:
1369        """
1370        A strand-ambiguous variant has reference (`variants_ref`) and alternate (`variants_alt`) alleles 
1371        in the pairs A/T, T/A, C/G, or G/C, where both alleles are complementary and thus indistinguishable 
1372        in terms of strand orientation.
1373
1374        Args:
1375            inplace (bool, default=False): 
1376                If True, modifies `self` in place. If False, returns a new `SNPObject` with the 
1377                strand-ambiguous variants removed. Default is False.
1378
1379        Returns:
1380            **Optional[SNPObject]:** A new `SNPObject` with non-ambiguous variants only if `inplace=False`. 
1381            If `inplace=True`, modifies `self` in place and returns None.
1382        """
1383        # Identify strand-ambiguous SNPs using vectorized comparisons
1384        is_AT = (self['variants_ref'] == 'A') & (self['variants_alt'] == 'T')
1385        is_TA = (self['variants_ref'] == 'T') & (self['variants_alt'] == 'A')
1386        is_CG = (self['variants_ref'] == 'C') & (self['variants_alt'] == 'G')
1387        is_GC = (self['variants_ref'] == 'G') & (self['variants_alt'] == 'C')
1388
1389        # Create a combined mask for all ambiguous variants
1390        ambiguous_mask = is_AT | is_TA | is_CG | is_GC
1391        non_ambiguous_idx = np.where(~ambiguous_mask)[0]
1392
1393        # Count each type of ambiguity using numpy's sum on boolean arrays
1394        A_T_count = np.sum(is_AT)
1395        T_A_count = np.sum(is_TA)
1396        C_G_count = np.sum(is_CG)
1397        G_C_count = np.sum(is_GC)
1398
1399        # Log the counts of each type of strand-ambiguous variants
1400        total_ambiguous = A_T_count + T_A_count + C_G_count + G_C_count
1401        log.info(f'{A_T_count} ambiguities of A-T type.')
1402        log.info(f'{T_A_count} ambiguities of T-A type.')
1403        log.info(f'{C_G_count} ambiguities of C-G type.')
1404        log.info(f'{G_C_count} ambiguities of G-C type.')
1405
1406        # Filter out ambiguous variants and keep non-ambiguous ones
1407        log.debug(f'Removing {total_ambiguous} strand-ambiguous variants...')
1408        return self.filter_variants(indexes=non_ambiguous_idx, include=True, inplace=inplace)
1409
1410    def correct_flipped_variants(
1411        self, 
1412        snpobj: 'SNPObject', 
1413        check_complement: bool = True, 
1414        index_by: str = 'pos', 
1415        common_variants_intersection: Optional[Tuple[np.ndarray, np.ndarray]] = None,
1416        log_stats: bool = True,
1417        inplace: bool = False
1418    ) -> Optional['SNPObject']:
1419        """
1420        Correct flipped variants between between `self` and a reference `snpobj`, where reference (`variants_ref`) 
1421        and alternate (`variants_alt`) alleles are swapped.
1422
1423        **Flip Detection Based on `check_complement`:**
1424
1425        - If `check_complement=False`, only direct allele swaps are considered:
1426            1. **Direct Swap:** `self.variants_ref == snpobj.variants_alt` and `self.variants_alt == snpobj.variants_ref`.
1427
1428        - If `check_complement=True`, both direct and complementary swaps are considered, with four possible cases:
1429            1. **Direct Swap:** `self.variants_ref == snpobj.variants_alt` and `self.variants_alt == snpobj.variants_ref`.
1430            2. **Complement Swap of Ref:** `complement(self.variants_ref) == snpobj.variants_alt` and `self.variants_alt == snpobj.variants_ref`.
1431            3. **Complement Swap of Alt:** `self.variants_ref == snpobj.variants_alt` and `complement(self.variants_alt) == snpobj.variants_ref`.
1432            4. **Complement Swap of both Ref and Alt:** `complement(self.variants_ref) == snpobj.variants_alt` and `complement(self.variants_alt) == snpobj.variants_ref`.
1433
1434        **Note:** Variants where `self.variants_ref == self.variants_alt` are ignored as they are ambiguous.
1435
1436        **Correction Process:** 
1437        - Swaps `variants_ref` and `variants_alt` alleles in `self` to align with `snpobj`.
1438        - Flips `calldata_gt` values (0 becomes 1, and 1 becomes 0) to match the updated allele configuration.
1439
1440        Args:
1441            snpobj (SNPObject): 
1442                The reference SNPObject to compare against.
1443            check_complement (bool, default=True): 
1444                If True, also checks for complementary base pairs (A/T, T/A, C/G, and G/C) when identifying swapped variants.
1445                Default is True.
1446            index_by (str, default='pos'): 
1447                Criteria for matching variants. Options:
1448                - `'pos'`: Matches by chromosome and position (`variants_chrom`, `variants_pos`), e.g., 'chr1-12345'.
1449                - `'id'`: Matches by variant ID alone (`variants_id`), e.g., 'rs123'.
1450                - `'pos+id'`: Matches by chromosome, position, and ID (`variants_chrom`, `variants_pos`, `variants_id`), e.g., 'chr1-12345-rs123'.
1451                Default is 'pos'.
1452            common_variants_intersection (tuple of arrays, optional): 
1453                Precomputed indices of common variants between `self` and `snpobj`. If None, intersection is 
1454                computed within the function.
1455            log_stats (bool, default=True): 
1456                If True, logs statistical information about matching and ambiguous alleles. Default is True.
1457            inplace (bool, default=False): 
1458                If True, modifies `self` in place. If False, returns a new `SNPObject` with corrected 
1459                flips. Default is False.
1460
1461        Returns:
1462            **Optional[SNPObject]**: 
1463                A new `SNPObject` with corrected flips if `inplace=False`. 
1464                If `inplace=True`, modifies `self` in place and returns None.
1465        """
1466        # Define complement mappings for nucleotides
1467        complement_map = {'A': 'T', 'T': 'A', 'C': 'G', 'G': 'C'}
1468
1469        # Helper function to get the complement of a base
1470        def get_complement(base: str) -> str:
1471            return complement_map.get(base, base)
1472
1473        # Get common variant indices if not provided
1474        if common_variants_intersection != None:
1475            query_idx, reference_idx = common_variants_intersection
1476        else:
1477            _, query_idx, reference_idx = self.get_common_variants_intersection(snpobj, index_by=index_by)
1478
1479        # Log statistics on matching alleles if enabled
1480        if log_stats:
1481            matching_ref = np.sum(self['variants_ref'][query_idx] == snpobj['variants_ref'][reference_idx])
1482            matching_alt = np.sum(self['variants_alt'][query_idx] == snpobj['variants_alt'][reference_idx])
1483            ambiguous = np.sum(self['variants_ref'][query_idx] == self['variants_alt'][query_idx])
1484            log.info(f"Matching reference alleles (ref=ref'): {matching_ref}, Matching alternate alleles (alt=alt'): {matching_alt}.")
1485            log.info(f"Number of ambiguous alleles (ref=alt): {ambiguous}.")
1486
1487        # Identify indices where `ref` and `alt` alleles are swapped
1488        if not check_complement:
1489            # Simple exact match for swapped alleles
1490            swapped_ref = (self['variants_ref'][query_idx] == snpobj['variants_alt'][reference_idx])
1491            swapped_alt = (self['variants_alt'][query_idx] == snpobj['variants_ref'][reference_idx])
1492        else:
1493            # Check for swapped or complementary-swapped alleles
1494            swapped_ref = (
1495                (self['variants_ref'][query_idx] == snpobj['variants_alt'][reference_idx]) |
1496                (np.vectorize(get_complement)(self['variants_ref'][query_idx]) == snpobj['variants_alt'][reference_idx])
1497            )
1498            swapped_alt = (
1499                (self['variants_alt'][query_idx] == snpobj['variants_ref'][reference_idx]) |
1500                (np.vectorize(get_complement)(self['variants_alt'][query_idx]) == snpobj['variants_ref'][reference_idx])
1501            )
1502
1503        # Filter out ambiguous variants where `ref` and `alt` alleles match (ref=alt)
1504        not_ambiguous = (self['variants_ref'][query_idx] != self['variants_alt'][query_idx])
1505
1506        # Indices in `self` of flipped variants
1507        flip_idx_query = query_idx[swapped_ref & swapped_alt & not_ambiguous]
1508
1509        # Correct the identified variant flips
1510        if len(flip_idx_query) > 0:
1511            log.info(f'Correcting {len(flip_idx_query)} variant flips...')
1512
1513            temp_alts = self['variants_alt'][flip_idx_query]
1514            temp_refs = self['variants_ref'][flip_idx_query]
1515
1516            # Correct the variant flips based on whether the operation is in-place or not
1517            if inplace:
1518                self['variants_alt'][flip_idx_query] = temp_refs
1519                self['variants_ref'][flip_idx_query] = temp_alts
1520                self['calldata_gt'][flip_idx_query] = 1 - self['calldata_gt'][flip_idx_query]
1521                return None
1522            else:
1523                snpobj = self.copy()
1524                snpobj['variants_alt'][flip_idx_query] = temp_refs
1525                snpobj['variants_ref'][flip_idx_query] = temp_alts
1526                snpobj['calldata_gt'][flip_idx_query] = 1 - snpobj['calldata_gt'][flip_idx_query]
1527                return snpobj
1528        else:
1529            log.info('No variant flips found to correct.')
1530            return self if not inplace else None
1531
1532    def remove_mismatching_variants(
1533        self, 
1534        snpobj: 'SNPObject', 
1535        index_by: str = 'pos', 
1536        common_variants_intersection: Optional[Tuple[np.ndarray, np.ndarray]] = None,
1537        inplace: bool = False
1538    ) -> Optional['SNPObject']:
1539        """
1540        Remove variants from `self`, where reference (`variants_ref`) and/or alternate (`variants_alt`) alleles 
1541        do not match with a reference `snpobj`.
1542
1543        Args:
1544            snpobj (SNPObject): 
1545                The reference SNPObject to compare against.
1546            index_by (str, default='pos'): 
1547                Criteria for matching variants. Options:
1548                - `'pos'`: Matches by chromosome and position (`variants_chrom`, `variants_pos`), e.g., 'chr1-12345'.
1549                - `'id'`: Matches by variant ID alone (`variants_id`), e.g., 'rs123'.
1550                - `'pos+id'`: Matches by chromosome, position, and ID (`variants_chrom`, `variants_pos`, `variants_id`), e.g., 'chr1-12345-rs123'.
1551                Default is 'pos'.
1552            common_variants_intersection (tuple of arrays, optional): 
1553                Precomputed indices of common variants between `self` and the reference `snpobj`.
1554                If None, the intersection is computed within the function.
1555            inplace (bool, default=False): 
1556                If True, modifies `self` in place. If False, returns a new `SNPObject` without 
1557                mismatching variants. Default is False.
1558
1559        Returns:
1560            **Optional[SNPObject]:** 
1561                A new `SNPObject` without mismatching variants if `inplace=False`. 
1562                If `inplace=True`, modifies `self` in place and returns None.
1563        """
1564        # Get common variant indices if not provided
1565        if common_variants_intersection is not None:
1566            query_idx, reference_idx = common_variants_intersection
1567        else:
1568            _, query_idx, reference_idx = self.get_common_variants_intersection(snpobj, index_by=index_by)
1569
1570        # Vectorized comparison of `ref` and `alt` alleles
1571        ref_mismatch = self['variants_ref'][query_idx] != snpobj['variants_ref'][reference_idx]
1572        alt_mismatch = self['variants_alt'][query_idx] != snpobj['variants_alt'][reference_idx]
1573        mismatch_mask = ref_mismatch | alt_mismatch
1574
1575        # Identify indices in `self` of mismatching variants
1576        mismatch_idx = query_idx[mismatch_mask]
1577
1578        # Compute total number of variant mismatches
1579        total_mismatches = np.sum(mismatch_mask)
1580
1581        # Filter out mismatching variants
1582        log.debug(f'Removing {total_mismatches} mismatching variants...')
1583        return self.filter_variants(indexes=mismatch_idx, include=True, inplace=inplace)
1584
1585    def shuffle_variants(self, inplace: bool = False) -> Optional['SNPObject']:
1586        """
1587        Randomly shuffle the positions of variants in the SNPObject, ensuring that all associated 
1588        data (e.g., `calldata_gt` and variant-specific attributes) remain aligned.
1589
1590        Args:
1591            inplace (bool, default=False): 
1592                If True, modifies `self` in place. If False, returns a new `SNPObject` with 
1593                shuffled variants. Default is False.
1594
1595        Returns:
1596            **Optional[SNPObject]:** 
1597                A new `SNPObject` without shuffled variant positions if `inplace=False`. 
1598                If `inplace=True`, modifies `self` in place and returns None.
1599        """
1600        # Generate a random permutation index for shuffling variant positions
1601        shuffle_index = np.random.permutation(self.n_snps)
1602
1603        # Apply shuffling to all relevant attributes using the class's dictionary-like interface
1604        if inplace:
1605            for key in self.keys():
1606                if self[key] is not None:
1607                    if key == 'calldata_gt':
1608                        # `calldata_gt`` has a different shape, so it's shuffled along axis 0
1609                        self[key] = self[key][shuffle_index, ...]
1610                    elif 'variant' in key:
1611                        # snpobj attributes are 1D arrays
1612                        self[key] = np.asarray(self[key])[shuffle_index]
1613            return None
1614        else:
1615            shuffled_snpobj = self.copy()
1616            for key in shuffled_snpobj.keys():
1617                if shuffled_snpobj[key] is not None:
1618                    if key == 'calldata_gt':
1619                        shuffled_snpobj[key] = shuffled_snpobj[key][shuffle_index, ...]
1620                    elif 'variant' in key:
1621                        shuffled_snpobj[key] = np.asarray(shuffled_snpobj[key])[shuffle_index]
1622            return shuffled_snpobj
1623
1624    def set_empty_to_missing(self, inplace: bool = False) -> Optional['SNPObject']:
1625        """
1626        Replace empty strings `''` with missing values `'.'` in attributes of `self`.
1627
1628        Args:
1629            inplace (bool, default=False): 
1630                If True, modifies `self` in place. If False, returns a new `SNPObject` with empty 
1631                strings `''` replaced by missing values `'.'`. Default is False.
1632
1633        Returns:
1634            **Optional[SNPObject]:** 
1635                A new `SNPObject` with empty strings replaced if `inplace=False`. 
1636                If `inplace=True`, modifies `self` in place and returns None.
1637        """
1638        if inplace:
1639            if self.variants_alt is not None:
1640                self.variants_alt[self.variants_alt == ''] = '.'
1641            if self.variants_ref is not None:
1642                self.variants_ref[self.variants_ref == ''] = '.'
1643            if self.variants_qual is not None:
1644                self.variants_qual = self.variants_qual.astype(str)
1645                self.variants_qual[(self.variants_qual == '') | (self.variants_qual == 'nan')] = '.'
1646            if self.variants_chrom is not None:
1647                self.variants_chrom = self.variants_chrom.astype(str)
1648                self.variants_chrom[self.variants_chrom == ''] = '.'
1649            if self.variants_filter_pass is not None:
1650                self.variants_filter_pass[self.variants_filter_pass == ''] = '.'
1651            if self.variants_id is not None:
1652                self.variants_id[self.variants_id == ''] = '.'
1653            return self
1654        else:
1655            snpobj = self.copy()
1656            if snpobj.variants_alt is not None:
1657                snpobj.variants_alt[snpobj.variants_alt == ''] = '.'
1658            if snpobj.variants_ref is not None:
1659                snpobj.variants_ref[snpobj.variants_ref == ''] = '.'
1660            if snpobj.variants_qual is not None:
1661                snpobj.variants_qual = snpobj.variants_qual.astype(str)
1662                snpobj.variants_qual[(snpobj.variants_qual == '') | (snpobj.variants_qual == 'nan')] = '.'
1663            if snpobj.variants_chrom is not None:
1664                snpobj.variants_chrom[snpobj.variants_chrom == ''] = '.'
1665            if snpobj.variants_filter_pass is not None:
1666                snpobj.variants_filter_pass[snpobj.variants_filter_pass == ''] = '.'
1667            if snpobj.variants_id is not None:
1668                snpobj.variants_id[snpobj.variants_id == ''] = '.'
1669            return snpobj
1670
1671    def convert_to_window_level(
1672        self,
1673        window_size: Optional[int] = None,
1674        physical_pos: Optional[np.ndarray] = None,
1675        chromosomes: Optional[np.ndarray] = None,
1676        window_sizes: Optional[np.ndarray] = None,
1677        laiobj: Optional['LocalAncestryObject'] = None
1678    ) -> 'LocalAncestryObject':
1679        """
1680        Aggregate the `calldata_lai` attribute into genomic windows within a 
1681        `snputils.ancestry.genobj.LocalAncestryObject`.
1682
1683        **Options for defining windows (in order of precedence):**
1684
1685        1. **Fixed window size**:
1686        - Use `window_size` to specify how many SNPs go into each window. The last window on each 
1687        chromosome may be larger if SNPs are not evenly divisible by the size.
1688
1689        2. **Custom start and end positions**:
1690        - Provide `physical_pos` (2D array of shape (n_windows, 2)) as the [start, end] base-pair 
1691         coordinates for each window. 
1692        - If `chromosomes` is not provided and `self` has exactly one chromosome, all windows are 
1693        assumed to belong to that chromosome. 
1694        - If multiple chromosomes exist but `chromosomes` is missing, an error will be raised.
1695        - Optionally, provide `window_sizes` to store the SNP count per-window.
1696
1697        3. **Matching existing windows**:
1698        - Reuse window definitions (`physical_pos`, `chromosomes`, `window_sizes`) from an existing `laiobj`.
1699
1700        Args:
1701            window_size (int, optional): 
1702                Number of SNPs in each window if defining fixed-size windows. If the total number of 
1703                SNPs in a chromosome is not evenly divisible by the window size, the last window on that 
1704                chromosome will include all remaining SNPs and therefore be larger than the specified size.
1705            physical_pos (array of shape (n_windows, 2), optional): 
1706                A 2D array containing the start and end physical positions for each window.
1707            chromosomes (array of shape (n_windows,), optional): 
1708                An array with chromosome numbers corresponding to each genomic window.
1709            window_sizes (array of shape (n_windows,), optional): 
1710                An array specifying the number of SNPs in each genomic window.
1711            laiobj (LocalAncestryObject, optional): 
1712                A reference `LocalAncestryObject` from which to copy existing window definitions.
1713
1714        Returns:
1715            **LocalAncestryObject:** 
1716                A LocalAncestryObject containing window-level ancestry data.
1717        """
1718        from snputils.ancestry.genobj.local import LocalAncestryObject
1719
1720        if window_size is None and physical_pos is None and laiobj is None:
1721            raise ValueError("One of `window_size`, `physical_pos`, or `laiobj` must be provided.")
1722        
1723        # Fixed window size
1724        if window_size is not None:
1725            physical_pos = []   # Boundaries [start, end] of each window
1726            chromosomes = []    # Chromosome for each window
1727            window_sizes = []   # Number of SNPs for each window
1728            for chrom in self.unique_chrom:
1729                # Extract indices corresponding to this chromosome
1730                mask_chrom = (self.variants_chrom == chrom)
1731                # Subset to this chromosome
1732                pos_chrom = self.variants_pos[mask_chrom]
1733                # Number of SNPs for this chromosome
1734                n_snps_chrom = pos_chrom.size
1735                
1736                # Initialize the start of the first window with the position of the first SNP
1737                current_start = self.variants_pos[0]
1738
1739                # Number of full windows with exactly `window_size` SNPs
1740                n_full_windows = n_snps_chrom // window_size
1741
1742                # Build all but the last window
1743                for i in range(n_full_windows-1):
1744                    current_end = self.variants_pos[(i+1) * window_size - 1]
1745                    physical_pos.append([current_start, current_end])
1746                    chromosomes.append(chrom)
1747                    window_sizes.append(window_size)
1748                    current_start = self.variants_pos[(i+1) * window_size]
1749                
1750                # Build the last window
1751                current_end = self.variants_pos[-1]
1752                physical_pos.append([current_start, current_end])
1753                chromosomes.append(chrom)
1754                window_sizes.append(n_snps_chrom - ((n_full_windows - 1) * window_size))
1755                
1756            physical_pos = np.array(physical_pos)
1757            chromosomes = np.array(chromosomes)
1758            window_sizes = np.array(window_sizes)
1759        
1760        # Custom start and end positions
1761        elif physical_pos is not None:
1762            # Check if there is exactly one chromosome
1763            if chromosomes is None:
1764                unique_chrom = self.unique_chrom
1765                if len(unique_chrom) == 1:
1766                    # We assume all windows belong to this single chromosome
1767                    single_chrom = unique_chrom[0]
1768                    chromosomes = np.array([single_chrom] * physical_pos.shape[0])
1769                else:
1770                    raise ValueError("Multiple chromosomes detected, but `chromosomes` was not provided.")
1771
1772        # Match existing windows to a reference laiobj
1773        elif laiobj is not None:
1774            physical_pos = laiobj.physical_pos
1775            chromosomes = laiobj.chromosomes
1776            window_sizes = laiobj.window_sizes
1777
1778        # Allocate an output LAI array
1779        n_windows = physical_pos.shape[0]
1780        n_samples = self.n_samples
1781        if self.calldata_lai.ndim == 3:
1782            lai = np.zeros((n_windows, n_samples, 2))
1783        else:
1784            lai = np.zeros((n_windows, n_samples*2))
1785
1786        # For each window, find the relevant SNPs and compute the mode of the ancestries
1787        for i, ((start, end), chrom) in enumerate(zip(physical_pos, chromosomes)):
1788            snps_mask = (
1789                (self.variants_chrom == chrom) &
1790                (self.variants_pos >= start) &
1791                (self.variants_pos <= end)
1792            )
1793            if np.any(snps_mask):
1794                lai_mask = self.calldata_lai[snps_mask, ...]
1795                mode_ancestries = mode(lai_mask, axis=0, nan_policy='omit').mode
1796                lai[i] = mode_ancestries
1797            else:
1798                lai[i] = np.nan
1799
1800        # Generate haplotype labels, e.g. "Sample1.0", "Sample1.1"
1801        haplotypes = [f"{sample}.{i}" for sample in self.samples for i in range(2)]
1802
1803        # If original data was (n_snps, n_samples, 2), flatten to (n_windows, n_samples*2)
1804        if self.calldata_lai.ndim == 3:
1805            lai = lai.reshape(n_windows, -1)
1806
1807        # Aggregate into a LocalAncestryObject
1808        return LocalAncestryObject(
1809            haplotypes=haplotypes,
1810            lai=lai,
1811            samples=self.samples,
1812            ancestry_map=self.ancestry_map,
1813            window_sizes=window_sizes,
1814            physical_pos=physical_pos,
1815            chromosomes=chromosomes
1816        )
1817
1818    def save(self, file: Union[str, Path]) -> None:
1819        """
1820        Save the data stored in `self` to a specified file.
1821
1822        The format of the saved file is determined by the file extension provided in the `file` 
1823        argument. 
1824        
1825        **Supported formats:**
1826        
1827        - `.bed`: Binary PED (Plink) format.
1828        - `.pgen`: Plink2 binary genotype format.
1829        - `.vcf`: Variant Call Format.
1830        - `.pkl`: Pickle format for saving `self` in serialized form.
1831
1832        Args:
1833            file (str or pathlib.Path): 
1834                Path to the file where the data will be saved. The extension of the file determines the save format. 
1835                Supported extensions: `.bed`, `.pgen`, `.vcf`, `.pkl`.
1836        """
1837        ext = Path(file).suffix.lower()
1838        if ext == '.bed':
1839            self.save_bed(file)
1840        elif ext == '.pgen':
1841            self.save_pgen(file)
1842        elif ext == '.vcf':
1843            self.save_vcf(file)
1844        elif ext == '.pkl':
1845            self.save_pickle(file)
1846        else:
1847            raise ValueError(f"Unsupported file extension: {ext}")
1848
1849    def save_bed(self, file: Union[str, Path]) -> None:
1850        """
1851        Save the data stored in `self` to a `.bed` file.
1852
1853        Args:
1854            file (str or pathlib.Path): 
1855                Path to the file where the data will be saved. It should end with `.bed`. 
1856                If the provided path does not have this extension, it will be appended.
1857        """
1858        from snputils.snp.io.write.bed import BEDWriter
1859        writer = BEDWriter(snpobj=self, filename=file)
1860        writer.write()
1861
1862    def save_pgen(self, file: Union[str, Path]) -> None:
1863        """
1864        Save the data stored in `self` to a `.pgen` file.
1865
1866        Args:
1867            file (str or pathlib.Path): 
1868                Path to the file where the data will be saved. It should end with `.pgen`. 
1869                If the provided path does not have this extension, it will be appended.
1870        """
1871        from snputils.snp.io.write.pgen import PGENWriter
1872        writer = PGENWriter(snpobj=self, filename=file)
1873        writer.write()
1874
1875    def save_vcf(self, file: Union[str, Path]) -> None:
1876        """
1877        Save the data stored in `self` to a `.vcf` file.
1878
1879        Args:
1880            file (str or pathlib.Path): 
1881                Path to the file where the data will be saved. It should end with `.vcf`. 
1882                If the provided path does not have this extension, it will be appended.
1883        """
1884        from snputils.snp.io.write.vcf import VCFWriter
1885        writer = VCFWriter(snpobj=self, filename=file)
1886        writer.write()
1887
1888    def save_pickle(self, file: Union[str, Path]) -> None:
1889        """
1890        Save `self` in serialized form to a `.pkl` file.
1891
1892        Args:
1893            file (str or pathlib.Path): 
1894                Path to the file where the data will be saved. It should end with `.pkl`. 
1895                If the provided path does not have this extension, it will be appended.
1896        """
1897        import pickle
1898        with open(file, 'wb') as file:
1899            pickle.dump(self, file)
1900
1901    @staticmethod
1902    def _match_to_replace(val: Union[str, int, float], dictionary: Dict[Any, Any], regex: bool = True) -> Union[str, int, float]:
1903        """
1904        Find a matching key in the provided dictionary for the given value `val`
1905        and replace it with the corresponding value.
1906
1907        Args:
1908            val (str, int, or float): 
1909                The value to be matched and potentially replaced.
1910            dictionary (Dict): 
1911                A dictionary containing keys and values for matching and replacement.
1912                The keys should match the data type of `val`.
1913            regex (bool): 
1914                If True, interprets keys in `dictionary` as regular expressions.
1915                Default is True.
1916
1917        Returns:
1918            str, int, or float: 
1919                The replacement value from `dictionary` if a match is found; otherwise, the original `val`.
1920        """
1921        if regex:
1922            # Use regular expression matching to find replacements
1923            for key, value in dictionary.items():
1924                if isinstance(key, str):
1925                    match = re.match(key, val)
1926                    if match:
1927                        # Replace using the first matching regex pattern
1928                        return re.sub(key, value, val)
1929            # Return the original value if no regex match is found
1930            return val
1931        else:
1932            # Return the value for `val` if present in `dictionary`; otherwise, return `val`
1933            return dictionary.get(val, val)
1934
1935    @staticmethod
1936    def _get_chromosome_number(chrom_string: str) -> Union[int, str]:
1937        """
1938        Extracts the chromosome number from the given chromosome string.
1939
1940        Args:
1941            chrom_string (str): 
1942                The chromosome identifier.
1943
1944        Returns:
1945            int or str: 
1946                The numeric representation of the chromosome if detected. 
1947                Returns 10001 for 'X' or 'chrX', 10002 for 'Y' or 'chrY', 
1948                and the original `chrom_string` if unrecognized.
1949        """
1950        if chrom_string.isdigit():
1951            return int(chrom_string)
1952        else:
1953            chrom_num = re.search(r'\d+', chrom_string)
1954            if chrom_num:
1955                return int(chrom_num.group())
1956            elif chrom_string.lower() in ['x', 'chrx']:
1957                return 10001
1958            elif chrom_string.lower() in ['y', 'chry']:
1959                return 10002
1960            else:
1961                log.warning(f"Chromosome nomenclature not standard. Chromosome: {chrom_string}")
1962                return chrom_string
1963
1964    def _sanity_check(self) -> None:
1965        """
1966        Perform sanity checks to ensure LAI and ancestry map consistency.
1967
1968        This method checks that all unique ancestries in the LAI data are represented 
1969        in the ancestry map if it is provided.
1970        """
1971        if self.__calldata_lai is not None and self.__ancestry_map is not None:
1972            unique_ancestries = np.unique(self.__calldata_lai)
1973            missing_ancestries = [anc for anc in unique_ancestries if str(anc) not in self.__ancestry_map]
1974            if missing_ancestries:
1975                warnings.warn(f"Missing ancestries in ancestry_map: {missing_ancestries}")

A class for Single Nucleotide Polymorphism (SNP) data, with optional support for SNP-level Local Ancestry Information (LAI).

SNPObject( calldata_gt: numpy.ndarray | None = None, samples: numpy.ndarray | None = None, variants_ref: numpy.ndarray | None = None, variants_alt: numpy.ndarray | None = None, variants_chrom: numpy.ndarray | None = None, variants_filter_pass: numpy.ndarray | None = None, variants_id: numpy.ndarray | None = None, variants_pos: numpy.ndarray | None = None, variants_qual: numpy.ndarray | None = None, calldata_lai: numpy.ndarray | None = None, ancestry_map: Dict[str, str] | None = None)
22    def __init__(
23        self,
24        calldata_gt: Optional[np.ndarray] = None,
25        samples: Optional[np.ndarray] = None,
26        variants_ref: Optional[np.ndarray] = None,
27        variants_alt: Optional[np.ndarray] = None,
28        variants_chrom: Optional[np.ndarray] = None,
29        variants_filter_pass: Optional[np.ndarray] = None,
30        variants_id: Optional[np.ndarray] = None,
31        variants_pos: Optional[np.ndarray] = None,
32        variants_qual: Optional[np.ndarray] = None,
33        calldata_lai: Optional[np.ndarray] = None,
34        ancestry_map: Optional[Dict[str, str]] = None
35    ) -> None:
36        """
37        Args:
38            calldata_gt (array, optional): 
39                An array containing genotype data for each sample. This array can be either 2D with shape 
40                `(n_snps, n_samples)` if the paternal and maternal strands are summed, or 3D with shape 
41                `(n_snps, n_samples, 2)` if the strands are kept separate.
42            samples (array of shape (n_sampels,), optional): 
43                An array containing unique sample identifiers.
44            variants_ref (array of shape (n_snps,), optional): 
45                An array containing the reference allele for each SNP.
46            variants_alt (array of shape (n_snps,), optional): 
47                An array containing the alternate allele for each SNP.
48            variants_chrom (array of shape (n_snps,), optional): 
49                An array containing the chromosome for each SNP.
50            variants_filter_pass (array of shape (n_snps,), optional): 
51                An array indicating whether each SNP passed control checks.
52            variants_id (array of shape (n_snps,), optional): 
53                An array containing unique identifiers (IDs) for each SNP.
54            variants_pos (array of shape (n_snps,), optional): 
55                An array containing the chromosomal positions for each SNP.
56            variants_qual (array of shape (n_snps,), optional): 
57                An array containing the Phred-scaled quality score for each SNP.
58            calldata_lai (array, optional): 
59                An array containing the ancestry for each SNP. This array can be either 2D with shape
60                `(n_snps, n_samples*2)`, or 3D with shape (n_snps, n_samples, 2).
61            ancestry_map (dict of str to str, optional): 
62                A dictionary mapping ancestry codes to region names.
63        """
64        self.__calldata_gt = calldata_gt
65        self.__samples = samples
66        self.__variants_ref = variants_ref
67        self.__variants_alt = variants_alt
68        self.__variants_chrom = variants_chrom
69        self.__variants_filter_pass = variants_filter_pass
70        self.__variants_id = variants_id
71        self.__variants_pos = variants_pos
72        self.__variants_qual = variants_qual
73        self.__calldata_lai = calldata_lai
74        self.__ancestry_map = ancestry_map
75
76        self._sanity_check()
Arguments:
  • calldata_gt (array, optional): An array containing genotype data for each sample. This array can be either 2D with shape (n_snps, n_samples) if the paternal and maternal strands are summed, or 3D with shape (n_snps, n_samples, 2) if the strands are kept separate.
  • samples (array of shape (n_sampels,), optional): An array containing unique sample identifiers.
  • variants_ref (array of shape (n_snps,), optional): An array containing the reference allele for each SNP.
  • variants_alt (array of shape (n_snps,), optional): An array containing the alternate allele for each SNP.
  • variants_chrom (array of shape (n_snps,), optional): An array containing the chromosome for each SNP.
  • variants_filter_pass (array of shape (n_snps,), optional): An array indicating whether each SNP passed control checks.
  • variants_id (array of shape (n_snps,), optional): An array containing unique identifiers (IDs) for each SNP.
  • variants_pos (array of shape (n_snps,), optional): An array containing the chromosomal positions for each SNP.
  • variants_qual (array of shape (n_snps,), optional): An array containing the Phred-scaled quality score for each SNP.
  • calldata_lai (array, optional): An array containing the ancestry for each SNP. This array can be either 2D with shape (n_snps, n_samples*2), or 3D with shape (n_snps, n_samples, 2).
  • ancestry_map (dict of str to str, optional): A dictionary mapping ancestry codes to region names.
calldata_gt: numpy.ndarray
 98    @property
 99    def calldata_gt(self) -> np.ndarray:
100        """
101        Retrieve `calldata_gt`.
102
103        Returns:
104            **array:** 
105                An array containing genotype data for each sample. This array can be either 2D with shape 
106                `(n_snps, n_samples)` if the paternal and maternal strands are summed, or 3D with shape 
107                `(n_snps, n_samples, 2)` if the strands are kept separate.
108        """
109        return self.__calldata_gt

Retrieve calldata_gt.

Returns:

array: An array containing genotype data for each sample. This array can be either 2D with shape (n_snps, n_samples) if the paternal and maternal strands are summed, or 3D with shape (n_snps, n_samples, 2) if the strands are kept separate.

samples: numpy.ndarray | None
118    @property
119    def samples(self) -> Optional[np.ndarray]:
120        """
121        Retrieve `samples`.
122
123        Returns:
124            **array of shape (n_sampels,):** 
125                An array containing unique sample identifiers.
126        """
127        return self.__samples

Retrieve samples.

Returns:

array of shape (n_sampels,): An array containing unique sample identifiers.

variants_ref: numpy.ndarray | None
136    @property
137    def variants_ref(self) -> Optional[np.ndarray]:
138        """
139        Retrieve `variants_ref`.
140
141        Returns:
142            **array of shape (n_snps,):** An array containing the reference allele for each SNP.
143        """
144        return self.__variants_ref

Retrieve variants_ref.

Returns:

array of shape (n_snps,): An array containing the reference allele for each SNP.

variants_alt: numpy.ndarray | None
153    @property
154    def variants_alt(self) -> Optional[np.ndarray]:
155        """
156        Retrieve `variants_alt`.
157
158        Returns:
159            **array of shape (n_snps,):** An array containing the alternate allele for each SNP.
160        """
161        return self.__variants_alt

Retrieve variants_alt.

Returns:

array of shape (n_snps,): An array containing the alternate allele for each SNP.

variants_chrom: numpy.ndarray | None
170    @property
171    def variants_chrom(self) -> Optional[np.ndarray]:
172        """
173        Retrieve `variants_chrom`.
174
175        Returns:
176            **array of shape (n_snps,):** An array containing the chromosome for each SNP.
177        """
178        return self.__variants_chrom

Retrieve variants_chrom.

Returns:

array of shape (n_snps,): An array containing the chromosome for each SNP.

variants_filter_pass: numpy.ndarray | None
187    @property
188    def variants_filter_pass(self) -> Optional[np.ndarray]:
189        """
190        Retrieve `variants_filter_pass`.
191
192        Returns:
193            **array of shape (n_snps,):** An array indicating whether each SNP passed control checks.
194        """
195        return self.__variants_filter_pass

Retrieve variants_filter_pass.

Returns:

array of shape (n_snps,): An array indicating whether each SNP passed control checks.

variants_id: numpy.ndarray | None
204    @property
205    def variants_id(self) -> Optional[np.ndarray]:
206        """
207        Retrieve `variants_id`.
208
209        Returns:
210            **array of shape (n_snps,):** An array containing unique identifiers (IDs) for each SNP.
211        """
212        return self.__variants_id

Retrieve variants_id.

Returns:

array of shape (n_snps,): An array containing unique identifiers (IDs) for each SNP.

variants_pos: numpy.ndarray | None
221    @property
222    def variants_pos(self) -> Optional[np.ndarray]:
223        """
224        Retrieve `variants_pos`.
225
226        Returns:
227            **array of shape (n_snps,):** An array containing the chromosomal positions for each SNP.
228        """
229        return self.__variants_pos

Retrieve variants_pos.

Returns:

array of shape (n_snps,): An array containing the chromosomal positions for each SNP.

variants_qual: numpy.ndarray | None
238    @property
239    def variants_qual(self) -> Optional[np.ndarray]:
240        """
241        Retrieve `variants_qual`.
242
243        Returns:
244            **array of shape (n_snps,):** An array containing the Phred-scaled quality score for each SNP.
245        """
246        return self.__variants_qual

Retrieve variants_qual.

Returns:

array of shape (n_snps,): An array containing the Phred-scaled quality score for each SNP.

calldata_lai: numpy.ndarray | None
255    @property
256    def calldata_lai(self) -> Optional[np.ndarray]:
257        """
258        Retrieve `calldata_lai`.
259
260        Returns:
261            **array:** 
262                An array containing the ancestry for each SNP. This array can be either 2D with shape
263                `(n_snps, n_samples*2)`, or 3D with shape (n_snps, n_samples, 2).
264        """
265        return self.__calldata_lai

Retrieve calldata_lai.

Returns:

array: An array containing the ancestry for each SNP. This array can be either 2D with shape (n_snps, n_samples*2), or 3D with shape (n_snps, n_samples, 2).

ancestry_map: Dict[str, str] | None
274    @property
275    def ancestry_map(self) -> Optional[Dict[str, str]]:
276        """
277        Retrieve `ancestry_map`.
278
279        Returns:
280            **dict of str to str:** A dictionary mapping ancestry codes to region names.
281        """
282        return self.__ancestry_map

Retrieve ancestry_map.

Returns:

dict of str to str: A dictionary mapping ancestry codes to region names.

n_samples: int
291    @property
292    def n_samples(self) -> int:
293        """
294        Retrieve `n_samples`.
295
296        Returns:
297            **int:** The total number of samples.
298        """
299        if self.__samples is not None:
300            return len(self.__samples)
301        elif self.__calldata_gt is not None:
302            return self.__calldata_gt.shape[1]
303        elif self.__calldata_lai is not None:
304            if self.__calldata_lai.ndim == 2:
305                return self.__calldata_lai.shape[1] // 2
306            elif self.__calldata_lai.ndim == 3:
307                return self.__calldata_lai.shape[1]
308        else:
309            raise ValueError("Unable to determine the total number of samples: no relevant data is available.")

Retrieve n_samples.

Returns:

int: The total number of samples.

n_snps: int
311    @property
312    def n_snps(self) -> int:
313        """
314        Retrieve `n_snps`.
315
316        Returns:
317            **int:** The total number of SNPs.
318        """
319        # List of attributes that can indicate the number of SNPs
320        potential_attributes = [
321            self.__calldata_gt,
322            self.__variants_ref,
323            self.__variants_alt,
324            self.__variants_chrom,
325            self.__variants_filter_pass,
326            self.__variants_id,
327            self.__variants_pos,
328            self.__variants_qual,
329            self.__calldata_lai
330        ]
331
332        # Check each attribute for its first dimension, which corresponds to `n_snps`
333        for attr in potential_attributes:
334            if attr is not None:
335                return attr.shape[0]
336
337        raise ValueError("Unable to determine the total number of SNPs: no relevant data is available.")

Retrieve n_snps.

Returns:

int: The total number of SNPs.

n_chrom: int | None
339    @property
340    def n_chrom(self) -> Optional[int]:
341        """
342        Retrieve `n_chrom`.
343
344        Returns:
345            **int:** The total number of unique chromosomes in `variants_chrom`.
346        """
347        if self.variants_chrom is None:
348            warnings.warn("Chromosome data `variants_chrom` is None.")
349            return None
350
351        return len(self.unique_chrom)

Retrieve n_chrom.

Returns:

int: The total number of unique chromosomes in variants_chrom.

n_ancestries: int
353    @property
354    def n_ancestries(self) -> int:
355        """
356        Retrieve `n_ancestries`.
357
358        Returns:
359            **int:** The total number of unique ancestries.
360        """
361        if self.__calldata_lai is not None:
362            return len(np.unique(self.__calldata_lai))
363        else:
364            raise ValueError("Unable to determine the total number of ancestries: no relevant data is available.")

Retrieve n_ancestries.

Returns:

int: The total number of unique ancestries.

unique_chrom: numpy.ndarray | None
366    @property
367    def unique_chrom(self) -> Optional[np.ndarray]:
368        """
369        Retrieve `unique_chrom`.
370
371        Returns:
372            **array:** The unique chromosome names in `variants_chrom`, preserving their order of appearance.
373        """
374        if self.variants_chrom is None:
375            warnings.warn("Chromosome data `variants_chrom` is None.")
376            return None
377
378        # Identify unique chromosome names and their first indexes of occurrence
379        _, idx = np.unique(self.variants_chrom, return_index=True)
380        # Return chromosome names sorted by their first occurrence to maintain original order
381        return self.variants_chrom[np.sort(idx)]

Retrieve unique_chrom.

Returns:

array: The unique chromosome names in variants_chrom, preserving their order of appearance.

are_strands_summed: bool
383    @property
384    def are_strands_summed(self) -> bool:
385        """
386        Retrieve `are_strands_summed`.
387        
388        Returns:
389            **bool:** 
390                True if the maternal and paternal strands have been summed together, which is indicated by 
391                `calldata_gt` having shape `(n_samples, n_snps)`. False if the strands are stored separately, 
392                indicated by `calldata_gt` having shape `(n_samples, n_snps, 2)`.
393        """
394        if self.calldata_gt is None:
395            warnings.warn("Genotype data `calldata_gt` is None.")
396            return None
397        
398        return self.calldata_gt.ndim == 2

Retrieve are_strands_summed.

Returns:

bool: True if the maternal and paternal strands have been summed together, which is indicated by calldata_gt having shape (n_samples, n_snps). False if the strands are stored separately, indicated by calldata_gt having shape (n_samples, n_snps, 2).

def copy(self) -> 'SNPObject':
400    def copy(self) -> 'SNPObject':
401        """
402        Create and return a copy of `self`.
403
404        Returns:
405            **SNPObject:** 
406                A new instance of the current object.
407        """
408        return copy.deepcopy(self)

Create and return a copy of self.

Returns:

SNPObject: A new instance of the current object.

def keys(self) -> List[str]:
410    def keys(self) -> List[str]:
411        """
412        Retrieve a list of public attribute names for `self`.
413
414        Returns:
415            **list of str:** 
416                A list of attribute names, with internal name-mangling removed, 
417                for easier reference to public attributes in the instance.
418        """
419        return [attr.replace('_SNPObject__', '') for attr in vars(self)]

Retrieve a list of public attribute names for self.

Returns:

list of str: A list of attribute names, with internal name-mangling removed, for easier reference to public attributes in the instance.

def sum_strands(self, inplace: bool = False) -> ForwardRef('SNPObject') | None:
421    def sum_strands(self, inplace: bool = False) -> Optional['SNPObject']:
422        """
423        Sum paternal and maternal strands.
424
425        Args:
426            inplace (bool, default=False): 
427                If True, modifies `self` in place. If False, returns a new `SNPObject` with the variants 
428                filtered. Default is False.
429
430        Returns:
431            **Optional[SNPObject]:** 
432                A new `SNPObject` with summed strands if `inplace=False`. 
433                If `inplace=True`, modifies `self` in place and returns None.
434        """
435        if self.calldata_gt is None:
436            warnings.warn("Genotype data `calldata_gt` is None.")
437            return None if not inplace else self
438
439        if self.are_strands_summed:
440            warnings.warn("Genotype data `calldata_gt` is already summed.")
441            return self if inplace else self.copy()
442        
443        if inplace:
444            self.calldata_gt = self.calldata_gt.sum(axis=2, dtype=np.int8)
445            return self
446        else:
447            snpobj = self.copy()
448            snpobj.calldata_gt = self.calldata_gt.sum(axis=2, dtype=np.int8)
449            return snpobj

Sum paternal and maternal strands.

Arguments:
  • inplace (bool, default=False): If True, modifies self in place. If False, returns a new SNPObject with the variants filtered. Default is False.
Returns:

Optional[SNPObject]: A new SNPObject with summed strands if inplace=False. If inplace=True, modifies self in place and returns None.

def filter_variants( self, chrom: str | Sequence[str] | numpy.ndarray | None = None, pos: int | Sequence[int] | numpy.ndarray | None = None, indexes: int | Sequence[int] | numpy.ndarray | None = None, include: bool = True, inplace: bool = False) -> ForwardRef('SNPObject') | None:
451    def filter_variants(
452            self, 
453            chrom: Optional[Union[str, Sequence[str], np.ndarray, None]] = None, 
454            pos: Optional[Union[int, Sequence[int], np.ndarray, None]] = None, 
455            indexes: Optional[Union[int, Sequence[int], np.ndarray, None]] = None, 
456            include: bool = True, 
457            inplace: bool = False
458        ) -> Optional['SNPObject']:
459        """
460        Filter variants based on specified chromosome names, variant positions, or variant indexes.
461
462        This method updates the `calldata_gt`, `variants_ref`, `variants_alt`, 
463        `variants_chrom`, `variants_filter_pass`, `variants_id`, `variants_pos`,  
464        `variants_qual`, and `lai` attributes to include or exclude the specified variants. The filtering 
465        criteria can be based on chromosome names, variant positions, or indexes. If multiple 
466        criteria are provided, their union is used for filtering. The order of the variants is preserved.
467        
468        Negative indexes are supported and follow 
469        [NumPy's indexing conventions](https://numpy.org/doc/stable/user/basics.indexing.html).
470
471        Args:
472            chrom (str or array_like of str, optional): 
473                Chromosome(s) to filter variants by. Can be a single chromosome as a string or a sequence 
474                of chromosomes. If both `chrom` and `pos` are provided, they must either have matching lengths 
475                (pairing each chromosome with a position) or `chrom` should be a single value that applies to 
476                all positions in `pos`. Default is None. 
477            pos (int or array_like of int, optional): 
478                Position(s) to filter variants by. Can be a single position as an integer or a sequence of positions. 
479                If `chrom` is also provided, `pos` should either match `chrom` in length or `chrom` should be a 
480                single value. Default is None.
481            indexes (int or array_like of int, optional): 
482                Index(es) of the variants to include or exclude. Can be a single index or a sequence
483                of indexes. Negative indexes are supported. Default is None.
484            include (bool, default=True): 
485                If True, includes only the specified variants. If False, excludes the specified
486                variants. Default is True.
487            inplace (bool, default=False): 
488                If True, modifies `self` in place. If False, returns a new `SNPObject` with the variants 
489                filtered. Default is False.
490
491        Returns:
492            **Optional[SNPObject]:** 
493                A new `SNPObject` with the specified variants filtered if `inplace=False`. 
494                If `inplace=True`, modifies `self` in place and returns None.
495        """
496        if chrom is None and pos is None and indexes is None:
497            raise ValueError("At least one of 'chrom', 'pos', or 'indexes' must be provided.")
498
499        n_snps = self.n_snps
500
501        # Convert inputs to arrays for consistency
502        chrom = np.atleast_1d(chrom) if chrom is not None else None
503        pos = np.atleast_1d(pos) if pos is not None else None
504        indexes = np.atleast_1d(indexes) if indexes is not None else None
505
506        # Validate chrom and pos lengths if both are provided
507        if chrom is not None and pos is not None:
508            if len(chrom) != len(pos) and len(chrom) > 1:
509                raise ValueError(
510                    "When both 'chrom' and 'pos' are provided, they must either be of the same length "
511                    "or 'chrom' must be a single value."
512                )
513
514        # Create a mask for chromosome and position filtering
515        mask_combined = np.zeros(n_snps, dtype=bool)
516        if chrom is not None and pos is not None:
517            if len(chrom) == 1:
518                # Apply single chromosome to all positions in `pos`
519                mask_combined = (self['variants_chrom'] == chrom[0]) & np.isin(self['variants_pos'], pos)
520            else:
521                # Vectorized pair matching for chrom and pos
522                query_pairs = np.array(
523                    list(zip(chrom, pos)),
524                    dtype=[
525                        ('chrom', self['variants_chrom'].dtype),
526                        ('pos', self['variants_pos'].dtype)
527                    ]
528                )
529                data_pairs = np.array(
530                    list(zip(self['variants_chrom'], self['variants_pos'])),
531                    dtype=[
532                        ('chrom', self['variants_chrom'].dtype),
533                        ('pos', self['variants_pos'].dtype)
534                    ]
535                )
536                mask_combined = np.isin(data_pairs, query_pairs)
537
538        elif chrom is not None:
539            # Only chromosome filtering
540            mask_combined = np.isin(self['variants_chrom'], chrom)
541        elif pos is not None:
542            # Only position filtering
543            mask_combined = np.isin(self['variants_pos'], pos)
544
545        # Create mask based on indexes if provided
546        if indexes is not None:
547            # Validate indexes, allowing negative indexes
548            out_of_bounds_indexes = indexes[(indexes < -n_snps) | (indexes >= n_snps)]
549            if out_of_bounds_indexes.size > 0:
550                raise ValueError(f"One or more sample indexes are out of bounds.")
551
552            # Handle negative indexes and check for out-of-bounds indexes
553            adjusted_indexes = np.mod(indexes, n_snps)
554
555            # Create mask for specified indexes
556            mask_indexes = np.zeros(n_snps, dtype=bool)
557            mask_indexes[adjusted_indexes] = True
558
559            # Combine with `chrom` and `pos` mask using logical OR (union of all specified criteria)
560            mask_combined = mask_combined | mask_indexes
561
562        # Invert mask if `include` is False
563        if not include:
564            mask_combined = ~mask_combined
565
566        # Define keys to filter
567        keys = [
568            'calldata_gt', 'variants_ref', 'variants_alt', 'variants_chrom', 'variants_filter_pass', 
569            'variants_id', 'variants_pos', 'variants_qual', 'calldata_lai'
570        ]
571
572        # Apply filtering based on inplace parameter
573        if inplace:
574            for key in keys:
575                if self[key] is not None:
576                    if self[key].ndim > 1:
577                        self[key] = np.asarray(self[key])[mask_combined, ...]
578                    else:
579                        self[key] = np.asarray(self[key])[mask_combined]
580
581            return None
582        else:
583            # Create A new `SNPObject` with filtered data
584            snpobj = self.copy()
585            for key in keys:
586                if snpobj[key] is not None:
587                    if snpobj[key].ndim > 1:
588                        snpobj[key] = np.asarray(snpobj[key])[mask_combined, ...]
589                    else:
590                        snpobj[key] = np.asarray(snpobj[key])[mask_combined]
591
592            return snpobj

Filter variants based on specified chromosome names, variant positions, or variant indexes.

This method updates the calldata_gt, variants_ref, variants_alt, variants_chrom, variants_filter_pass, variants_id, variants_pos,
variants_qual, and lai attributes to include or exclude the specified variants. The filtering criteria can be based on chromosome names, variant positions, or indexes. If multiple criteria are provided, their union is used for filtering. The order of the variants is preserved.

Negative indexes are supported and follow NumPy's indexing conventions.

Arguments:
  • chrom (str or array_like of str, optional): Chromosome(s) to filter variants by. Can be a single chromosome as a string or a sequence of chromosomes. If both chrom and pos are provided, they must either have matching lengths (pairing each chromosome with a position) or chrom should be a single value that applies to all positions in pos. Default is None.
  • pos (int or array_like of int, optional): Position(s) to filter variants by. Can be a single position as an integer or a sequence of positions. If chrom is also provided, pos should either match chrom in length or chrom should be a single value. Default is None.
  • indexes (int or array_like of int, optional): Index(es) of the variants to include or exclude. Can be a single index or a sequence of indexes. Negative indexes are supported. Default is None.
  • include (bool, default=True): If True, includes only the specified variants. If False, excludes the specified variants. Default is True.
  • inplace (bool, default=False): If True, modifies self in place. If False, returns a new SNPObject with the variants filtered. Default is False.
Returns:

Optional[SNPObject]: A new SNPObject with the specified variants filtered if inplace=False. If inplace=True, modifies self in place and returns None.

def filter_samples( self, samples: str | Sequence[str] | numpy.ndarray | None = None, indexes: int | Sequence[int] | numpy.ndarray | None = None, include: bool = True, reorder: bool = False, inplace: bool = False) -> ForwardRef('SNPObject') | None:
594    def filter_samples(
595            self, 
596            samples: Optional[Union[str, Sequence[str], np.ndarray, None]] = None,
597            indexes: Optional[Union[int, Sequence[int], np.ndarray, None]] = None,
598            include: bool = True,
599            reorder: bool = False,
600            inplace: bool = False
601        ) -> Optional['SNPObject']:
602        """
603        Filter samples based on specified names or indexes.
604
605        This method updates the `samples` and `calldata_gt` attributes to include or exclude the specified 
606        samples. The order of the samples is preserved. Set `reorder=True` to match the ordering of the
607        provided `samples` and/or `indexes` lists when including.
608
609        If both samples and indexes are provided, any sample matching either a name in samples or an index in 
610        indexes will be included or excluded.
611
612        This method allows inclusion or exclusion of specific samples by their names or 
613        indexes. When both sample names and indexes are provided, the union of the specified samples 
614        is used. Negative indexes are supported and follow 
615        [NumPy's indexing conventions](https://numpy.org/doc/stable/user/basics.indexing.html).
616
617        Args:
618            samples (str or array_like of str, optional): 
619                 Name(s) of the samples to include or exclude. Can be a single sample name or a
620                 sequence of sample names. Default is None.
621            indexes (int or array_like of int, optional):
622                Index(es) of the samples to include or exclude. Can be a single index or a sequence
623                of indexes. Negative indexes are supported. Default is None.
624            include (bool, default=True): 
625                If True, includes only the specified samples. If False, excludes the specified
626                samples. Default is True.
627            inplace (bool, default=False): 
628                If True, modifies `self` in place. If False, returns a new `SNPObject` with the samples 
629                filtered. Default is False.
630
631        Returns:
632            **Optional[SNPObject]:** 
633                A new `SNPObject` with the specified samples filtered if `inplace=False`. 
634                If `inplace=True`, modifies `self` in place and returns None.
635        """
636        if samples is None and indexes is None:
637            raise ValueError("At least one of 'samples' or 'indexes' must be provided.")
638
639        n_samples = self.n_samples
640        sample_names = np.array(self['samples'])
641
642        # Create mask based on sample names
643        if samples is not None:
644            samples = np.asarray(samples).ravel()
645            mask_samples = np.isin(sample_names, samples)
646            missing_samples = samples[~np.isin(samples, sample_names)]
647            if missing_samples.size > 0:
648                raise ValueError(f"The following specified samples were not found: {missing_samples.tolist()}")
649        else:
650            mask_samples = np.zeros(n_samples, dtype=bool)
651
652        # Create mask based on sample indexes
653        if indexes is not None:
654            indexes = np.asarray(indexes).ravel()
655
656            # Validate indexes, allowing negative indexes
657            out_of_bounds_indexes = indexes[(indexes < -n_samples) | (indexes >= n_samples)]
658            if out_of_bounds_indexes.size > 0:
659                raise ValueError(f"One or more sample indexes are out of bounds.")
660            
661            # Handle negative indexes
662            adjusted_indexes = np.mod(indexes, n_samples)
663
664            mask_indexes = np.zeros(n_samples, dtype=bool)
665            mask_indexes[adjusted_indexes] = True
666        else:
667            mask_indexes = np.zeros(n_samples, dtype=bool)
668
669        # Combine masks using logical OR (union of samples)
670        mask_combined = mask_samples | mask_indexes
671
672        if not include:
673            mask_combined = ~mask_combined
674
675        # If requested, compute an ordering of selected samples that follows the provided lists.
676        ordered_indices = None
677        if include and reorder:
678            sel_indices = np.where(mask_combined)[0]
679            ordered_list: List[int] = []
680            added = np.zeros(n_samples, dtype=bool)
681
682            # Prioritize the order in `samples`
683            if samples is not None:
684                name_to_idx = {name: idx for idx, name in enumerate(sample_names)}
685                for s in samples:
686                    idx = name_to_idx.get(s)
687                    if idx is not None and mask_combined[idx] and not added[idx]:
688                        ordered_list.append(idx)
689                        added[idx] = True
690
691            # Then respect the order in `indexes`
692            if indexes is not None:
693                adj_idx = np.mod(np.atleast_1d(indexes), n_samples)
694                for idx in adj_idx:
695                    if mask_combined[idx] and not added[idx]:
696                        ordered_list.append(int(idx))
697                        added[idx] = True
698
699            # Finally, append any remaining selected samples in their original order
700            for idx in sel_indices:
701                if not added[idx]:
702                    ordered_list.append(int(idx))
703
704            ordered_indices = np.asarray(ordered_list, dtype=int)
705
706        # Define keys to filter
707        keys = ['samples', 'calldata_gt', 'calldata_lai']
708
709        # Apply filtering based on inplace parameter
710        if inplace:
711            for key in keys:
712                if self[key] is not None:
713                    arr = np.asarray(self[key])
714                    if ordered_indices is not None:
715                        if key == 'calldata_lai' and arr.ndim == 2:
716                            # Haplotype-aware reordering for 2D LAI (n_snps, n_samples*2)
717                            hap_idx = np.concatenate([2*ordered_indices, 2*ordered_indices + 1])
718                            self[key] = arr[:, hap_idx]
719                        elif arr.ndim > 1:
720                            self[key] = arr[:, ordered_indices, ...]
721                        else:
722                            self[key] = arr[ordered_indices]
723                    else:
724                        if arr.ndim > 1:
725                            self[key] = arr[:, mask_combined, ...]
726                        else:
727                            self[key] = arr[mask_combined]
728
729            return None
730        else:
731            # Create A new `SNPObject` with filtered data
732            snpobj = self.copy()
733            for key in keys:
734                if snpobj[key] is not None:
735                    arr = np.asarray(snpobj[key])
736                    if ordered_indices is not None:
737                        if key == 'calldata_lai' and arr.ndim == 2:
738                            hap_idx = np.concatenate([2*ordered_indices, 2*ordered_indices + 1])
739                            snpobj[key] = arr[:, hap_idx]
740                        elif arr.ndim > 1:
741                            snpobj[key] = arr[:, ordered_indices, ...]
742                        else:
743                            snpobj[key] = arr[ordered_indices]
744                    else:
745                        if arr.ndim > 1:
746                            snpobj[key] = arr[:, mask_combined, ...]
747                        else:
748                            snpobj[key] = arr[mask_combined]
749            return snpobj

Filter samples based on specified names or indexes.

This method updates the samples and calldata_gt attributes to include or exclude the specified samples. The order of the samples is preserved. Set reorder=True to match the ordering of the provided samples and/or indexes lists when including.

If both samples and indexes are provided, any sample matching either a name in samples or an index in indexes will be included or excluded.

This method allows inclusion or exclusion of specific samples by their names or indexes. When both sample names and indexes are provided, the union of the specified samples is used. Negative indexes are supported and follow NumPy's indexing conventions.

Arguments:
  • samples (str or array_like of str, optional): Name(s) of the samples to include or exclude. Can be a single sample name or a sequence of sample names. Default is None.
  • indexes (int or array_like of int, optional): Index(es) of the samples to include or exclude. Can be a single index or a sequence of indexes. Negative indexes are supported. Default is None.
  • include (bool, default=True): If True, includes only the specified samples. If False, excludes the specified samples. Default is True.
  • inplace (bool, default=False): If True, modifies self in place. If False, returns a new SNPObject with the samples filtered. Default is False.
Returns:

Optional[SNPObject]: A new SNPObject with the specified samples filtered if inplace=False. If inplace=True, modifies self in place and returns None.

def detect_chromosome_format(self) -> str:
751    def detect_chromosome_format(self) -> str:
752        """
753        Detect the chromosome naming convention in `variants_chrom` based on the prefix 
754        of the first chromosome identifier in `unique_chrom`.
755        
756        **Recognized formats:**
757
758        - `'chr'`: Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'.
759        - `'chm'`: Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'.
760        - `'chrom'`: Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'.
761        - `'plain'`: Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'.
762        
763        If the format does not match any recognized pattern, `'Unknown format'` is returned.
764
765        Returns:
766            **str:** 
767                A string indicating the detected chromosome format (`'chr'`, `'chm'`, `'chrom'`, or `'plain'`).
768                If no recognized format is matched, returns `'Unknown format'`.
769        """
770        # Select the first unique chromosome identifier for format detection
771        chromosome_str = self.unique_chrom[0]
772
773        # Define regular expressions to match each recognized chromosome format
774        patterns = {
775            'chr': r'^chr(\d+|X|Y|M)$',    # Matches 'chr' prefixed format
776            'chm': r'^chm(\d+|X|Y|M)$',    # Matches 'chm' prefixed format
777            'chrom': r'^chrom(\d+|X|Y|M)$', # Matches 'chrom' prefixed format
778            'plain': r'^(\d+|X|Y|M)$'       # Matches plain format without prefix
779        }
780
781        # Iterate through the patterns to identify the chromosome format
782        for prefix, pattern in patterns.items():
783            if re.match(pattern, chromosome_str):
784                return prefix  # Return the recognized format prefix
785
786        # If no pattern matches, return 'Unknown format'
787        return 'Unknown format'

Detect the chromosome naming convention in variants_chrom based on the prefix of the first chromosome identifier in unique_chrom.

Recognized formats:

  • 'chr': Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'.
  • 'chm': Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'.
  • 'chrom': Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'.
  • 'plain': Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'.

If the format does not match any recognized pattern, 'Unknown format' is returned.

Returns:

str: A string indicating the detected chromosome format ('chr', 'chm', 'chrom', or 'plain'). If no recognized format is matched, returns 'Unknown format'.

def convert_chromosome_format( self, from_format: str, to_format: str, inplace: bool = False) -> ForwardRef('SNPObject') | None:
789    def convert_chromosome_format(
790        self, 
791        from_format: str, 
792        to_format: str, 
793        inplace: bool = False
794    ) -> Optional['SNPObject']:
795        """
796        Convert the chromosome format from one naming convention to another in `variants_chrom`.
797
798        **Supported formats:**
799
800        - `'chr'`: Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'.
801        - `'chm'`: Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'.
802        - `'chrom'`: Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'.
803        - `'plain'`: Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'.
804
805        Args:
806            from_format (str): 
807                The current chromosome format. Acceptable values are `'chr'`, `'chm'`, `'chrom'`, or `'plain'`.
808            to_format (str): 
809                The target format for chromosome data conversion. Acceptable values match `from_format` options.
810            inplace (bool, default=False): 
811                If True, modifies `self` in place. If False, returns a new `SNPObject` with the converted format.
812                Default is False.
813
814        Returns:
815            **Optional[SNPObject]:** A new `SNPObject` with the converted chromosome format if `inplace=False`. 
816            If `inplace=True`, modifies `self` in place and returns None.
817        """
818        # Define the list of standard chromosome identifiers
819        chrom_list = [*map(str, range(1, 23)), 'X', 'Y', 'M']  # M for mitochondrial chromosomes
820
821        # Format mappings for different chromosome naming conventions
822        format_mappings = {
823            'chr': [f'chr{i}' for i in chrom_list],
824            'chm': [f'chm{i}' for i in chrom_list],
825            'chrom': [f'chrom{i}' for i in chrom_list],
826            'plain': chrom_list,
827        }
828
829        # Verify that from_format and to_format are valid naming conventions
830        if from_format not in format_mappings or to_format not in format_mappings:
831            raise ValueError(f"Invalid format: {from_format} or {to_format}. Must be one of {list(format_mappings.keys())}.")
832
833        # Convert chromosomes to string for consistent comparison
834        variants_chrom = self['variants_chrom'].astype(str)
835
836        # Verify that all chromosomes in the object follow the specified `from_format`
837        expected_chroms = set(format_mappings[from_format])
838        mismatched_chroms = set(variants_chrom) - expected_chroms
839
840        if mismatched_chroms:
841            raise ValueError(f"The following chromosomes do not match the `from_format` '{from_format}': {mismatched_chroms}.")
842
843        # Create conditions for selecting based on current `from_format` names
844        conditions = [variants_chrom == chrom for chrom in format_mappings[from_format]]
845
846        # Rename chromosomes based on inplace flag
847        if inplace:
848            self['variants_chrom'] = np.select(conditions, format_mappings[to_format], default='unknown')
849            return None
850        else:
851            snpobject = self.copy()
852            snpobject['variants_chrom'] = np.select(conditions, format_mappings[to_format], default='unknown')
853            return snpobject

Convert the chromosome format from one naming convention to another in variants_chrom.

Supported formats:

  • 'chr': Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'.
  • 'chm': Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'.
  • 'chrom': Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'.
  • 'plain': Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'.
Arguments:
  • from_format (str): The current chromosome format. Acceptable values are 'chr', 'chm', 'chrom', or 'plain'.
  • to_format (str): The target format for chromosome data conversion. Acceptable values match from_format options.
  • inplace (bool, default=False): If True, modifies self in place. If False, returns a new SNPObject with the converted format. Default is False.
Returns:

Optional[SNPObject]: A new SNPObject with the converted chromosome format if inplace=False. If inplace=True, modifies self in place and returns None.

def match_chromosome_format( self, snpobj: 'SNPObject', inplace: bool = False) -> ForwardRef('SNPObject') | None:
855    def match_chromosome_format(self, snpobj: 'SNPObject', inplace: bool = False) -> Optional['SNPObject']:
856        """
857        Convert the chromosome format in `variants_chrom` from `self` to match the format of a reference `snpobj`.
858
859        **Recognized formats:**
860
861        - `'chr'`: Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'.
862        - `'chm'`: Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'.
863        - `'chrom'`: Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'.
864        - `'plain'`: Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'.
865
866        Args:
867            snpobj (SNPObject): 
868                The reference SNPObject whose chromosome format will be matched.
869            inplace (bool, default=False): 
870                If True, modifies `self` in place. If False, returns a new `SNPObject` with the 
871                chromosome format matching that of `snpobj`. Default is False.
872
873        Returns:
874            **Optional[SNPObject]:** 
875                A new `SNPObject` with matched chromosome format if `inplace=False`. 
876                If `inplace=True`, modifies `self` in place and returns None.
877        """
878        # Detect the chromosome naming format of the current SNPObject
879        fmt1 = self.detect_chromosome_format()
880        if fmt1 == 'Unknown format':
881            raise ValueError("The chromosome format of the current SNPObject is unrecognized.")
882        
883        # Detect the chromosome naming format of the reference SNPObject
884        fmt2 = snpobj.detect_chromosome_format()
885        if fmt2 == 'Unknown format':
886            raise ValueError("The chromosome format of the reference SNPObject is unrecognized.")
887
888        # Convert the current SNPObject's chromosome format to match the reference format
889        return self.convert_chromosome_format(fmt1, fmt2, inplace=inplace)

Convert the chromosome format in variants_chrom from self to match the format of a reference snpobj.

Recognized formats:

  • 'chr': Format with 'chr' prefix, e.g., 'chr1', 'chr2', ..., 'chrX', 'chrY', 'chrM'.
  • 'chm': Format with 'chm' prefix, e.g., 'chm1', 'chm2', ..., 'chmX', 'chmY', 'chmM'.
  • 'chrom': Format with 'chrom' prefix, e.g., 'chrom1', 'chrom2', ..., 'chromX', 'chromY', 'chromM'.
  • 'plain': Plain format without a prefix, e.g., '1', '2', ..., 'X', 'Y', 'M'.
Arguments:
  • snpobj (SNPObject): The reference SNPObject whose chromosome format will be matched.
  • inplace (bool, default=False): If True, modifies self in place. If False, returns a new SNPObject with the chromosome format matching that of snpobj. Default is False.
Returns:

Optional[SNPObject]: A new SNPObject with matched chromosome format if inplace=False. If inplace=True, modifies self in place and returns None.

def rename_chrom( self, to_replace: Dict[str, str] | str | List[str] = {'^([0-9]+)$': 'chr\\1', '^chr([0-9]+)$': '\\1'}, value: str | List[str] | None = None, regex: bool = True, inplace: bool = False) -> ForwardRef('SNPObject') | None:
891    def rename_chrom(
892        self,
893        to_replace: Union[Dict[str, str], str, List[str]] = {'^([0-9]+)$': r'chr\1', r'^chr([0-9]+)$': r'\1'},
894        value: Optional[Union[str, List[str]]] = None,
895        regex: bool = True,
896        inplace: bool = False
897    ) -> Optional['SNPObject']:
898        """
899        Replace chromosome values in `variants_chrom` using patterns or exact matches.
900
901        This method allows flexible chromosome replacements, using regex or exact matches, useful 
902        for non-standard chromosome formats. For standard conversions (e.g., 'chr1' to '1'), 
903        consider `convert_chromosome_format`.
904
905        Args:
906            to_replace (dict, str, or list of str): 
907                Pattern(s) or exact value(s) to be replaced in chromosome names. Default behavior 
908                transforms `<chrom_num>` to `chr<chrom_num>` or vice versa. Non-matching values 
909                remain unchanged.
910                - If str or list of str: Matches will be replaced with `value`.
911                - If regex (bool), then any regex matches will be replaced with `value`.
912                - If dict: Keys defines values to replace, with corresponding replacements as values.
913            value (str or list of str, optional): 
914                Replacement value(s) if `to_replace` is a string or list. Ignored if `to_replace` 
915                is a dictionary.
916            regex (bool, default=True): 
917                If True, interprets `to_replace` keys as regex patterns.
918            inplace (bool, default=False): 
919                If True, modifies `self` in place. If False, returns a new `SNPObject` with the chromosomes
920                renamed. Default is False.
921
922        Returns:
923            **Optional[SNPObject]:** A new `SNPObject` with the renamed chromosome format if `inplace=False`. 
924            If `inplace=True`, modifies `self` in place and returns None.
925        """
926        # Standardize input format: convert `to_replace` and `value` to a dictionary if needed
927        if isinstance(to_replace, (str, int)):
928            to_replace = [to_replace]
929        if isinstance(value, (str, int)):
930            value = [value]
931        if isinstance(to_replace, list) and isinstance(value, list):
932            dictionary = dict(zip(to_replace, value))
933        elif isinstance(to_replace, dict) and value is None:
934            dictionary = to_replace
935        else:
936            raise ValueError(
937            "Invalid input: `to_replace` and `value` must be compatible types (both str, list of str, or dict)."
938        )
939
940        # Vectorized function for replacing values in chromosome array
941        vec_replace_values = np.vectorize(self._match_to_replace)
942
943        # Rename chromosomes based on inplace flag
944        if inplace:
945            self.variants_chrom = vec_replace_values(self.variants_chrom, dictionary, regex)
946            return None
947        else:
948            snpobj = self.copy()
949            snpobj.variants_chrom = vec_replace_values(self.variants_chrom, dictionary, regex)
950            return snpobj

Replace chromosome values in variants_chrom using patterns or exact matches.

This method allows flexible chromosome replacements, using regex or exact matches, useful for non-standard chromosome formats. For standard conversions (e.g., 'chr1' to '1'), consider convert_chromosome_format.

Arguments:
  • to_replace (dict, str, or list of str): Pattern(s) or exact value(s) to be replaced in chromosome names. Default behavior transforms <chrom_num> to chr<chrom_num> or vice versa. Non-matching values remain unchanged.
    • If str or list of str: Matches will be replaced with value.
    • If regex (bool), then any regex matches will be replaced with value.
    • If dict: Keys defines values to replace, with corresponding replacements as values.
  • value (str or list of str, optional): Replacement value(s) if to_replace is a string or list. Ignored if to_replace is a dictionary.
  • regex (bool, default=True): If True, interprets to_replace keys as regex patterns.
  • inplace (bool, default=False): If True, modifies self in place. If False, returns a new SNPObject with the chromosomes renamed. Default is False.
Returns:

Optional[SNPObject]: A new SNPObject with the renamed chromosome format if inplace=False. If inplace=True, modifies self in place and returns None.

def rename_missings( self, before: int | float | str = -1, after: int | float | str = '.', inplace: bool = False) -> ForwardRef('SNPObject') | None:
952    def rename_missings(
953        self, 
954        before: Union[int, float, str] = -1, 
955        after: Union[int, float, str] = '.', 
956        inplace: bool = False
957    ) -> Optional['SNPObject']:
958        """
959        Replace missing values in the `calldata_gt` attribute.
960
961        This method identifies missing values in 'calldata_gt' and replaces them with a specified 
962        value. By default, it replaces occurrences of `-1` (often used to signify missing data) with `'.'`.
963
964        Args:
965            before (int, float, or str, default=-1): 
966                The current representation of missing values in `calldata_gt`. Common values might be -1, '.', or NaN.
967                Default is -1.
968            after (int, float, or str, default='.'): 
969                The value that will replace `before`. Default is '.'.
970            inplace (bool, default=False): 
971                If True, modifies `self` in place. If False, returns a new `SNPObject` with the applied 
972                replacements. Default is False.
973
974        Returns:
975            **Optional[SNPObject]:** 
976                A new `SNPObject` with the renamed missing values if `inplace=False`. 
977                If `inplace=True`, modifies `self` in place and returns None.
978        """
979        # Rename missing values in the `calldata_gt` attribute based on inplace flag
980        if inplace:
981            self['calldata_gt'] = np.where(self['calldata_gt'] == before, after, self['calldata_gt'])
982            return None
983        else:
984            snpobj = self.copy()
985            snpobj['calldata_gt'] = np.where(snpobj['calldata_gt'] == before, after, snpobj['calldata_gt'])
986            return snpobj

Replace missing values in the calldata_gt attribute.

This method identifies missing values in 'calldata_gt' and replaces them with a specified value. By default, it replaces occurrences of -1 (often used to signify missing data) with '.'.

Arguments:
  • before (int, float, or str, default=-1): The current representation of missing values in calldata_gt. Common values might be -1, '.', or NaN. Default is -1.
  • after (int, float, or str, default='.'): The value that will replace before. Default is '.'.
  • inplace (bool, default=False): If True, modifies self in place. If False, returns a new SNPObject with the applied replacements. Default is False.
Returns:

Optional[SNPObject]: A new SNPObject with the renamed missing values if inplace=False. If inplace=True, modifies self in place and returns None.

def get_common_variants_intersection( self, snpobj: 'SNPObject', index_by: str = 'pos') -> Tuple[List[str], numpy.ndarray, numpy.ndarray]:
 988    def get_common_variants_intersection(
 989        self, 
 990        snpobj: 'SNPObject', 
 991        index_by: str = 'pos'
 992    ) -> Tuple[List[str], np.ndarray, np.ndarray]:
 993        """
 994        Identify common variants between `self` and the `snpobj` instance based on the specified `index_by` criterion, 
 995        which may match based on chromosome and position (`variants_chrom`, `variants_pos`), ID (`variants_id`), or both.
 996
 997        This method returns the identifiers of common variants and their corresponding indices in both objects.
 998
 999        Args:
1000            snpobj (SNPObject): 
1001                The reference SNPObject to compare against.
1002            index_by (str, default='pos'): 
1003                Criteria for matching variants. Options:
1004                - `'pos'`: Matches by chromosome and position (`variants_chrom`, `variants_pos`), e.g., 'chr1-12345'.
1005                - `'id'`: Matches by variant ID alone (`variants_id`), e.g., 'rs123'.
1006                - `'pos+id'`: Matches by chromosome, position, and ID (`variants_chrom`, `variants_pos`, `variants_id`), e.g., 'chr1-12345-rs123'.
1007                Default is 'pos'.
1008
1009        Returns:
1010            Tuple containing:
1011            - **list of str:** A list of common variant identifiers (as strings).
1012            - **array:** An array of indices in `self` where common variants are located.
1013            - **array:** An array of indices in `snpobj` where common variants are located.
1014        """
1015        # Create unique identifiers for each variant in both SNPObjects based on the specified criterion
1016        if index_by == 'pos':
1017            query_identifiers = [f"{chrom}-{pos}" for chrom, pos in zip(self['variants_chrom'], self['variants_pos'])]
1018            reference_identifiers = [f"{chrom}-{pos}" for chrom, pos in zip(snpobj['variants_chrom'], snpobj['variants_pos'])]
1019        elif index_by == 'id':
1020            query_identifiers = self['variants_id'].tolist()
1021            reference_identifiers = snpobj['variants_id'].tolist()
1022        elif index_by == 'pos+id':
1023            query_identifiers = [
1024                f"{chrom}-{pos}-{ids}" for chrom, pos, ids in zip(self['variants_chrom'], self['variants_pos'], self['variants_id'])
1025            ]
1026            reference_identifiers = [
1027                f"{chrom}-{pos}-{ids}" for chrom, pos, ids in zip(snpobj['variants_chrom'], snpobj['variants_pos'], snpobj['variants_id'])
1028            ]
1029        else:
1030            raise ValueError("`index_by` must be one of 'pos', 'id', or 'pos+id'.")
1031
1032        # Convert to sets for intersection
1033        common_ids = set(query_identifiers).intersection(reference_identifiers)
1034
1035        # Collect indices for common identifiers
1036        query_idx = [i for i, id in enumerate(query_identifiers) if id in common_ids]
1037        reference_idx = [i for i, id in enumerate(reference_identifiers) if id in common_ids]
1038
1039        return list(common_ids), np.array(query_idx), np.array(reference_idx)

Identify common variants between self and the snpobj instance based on the specified index_by criterion, which may match based on chromosome and position (variants_chrom, variants_pos), ID (variants_id), or both.

This method returns the identifiers of common variants and their corresponding indices in both objects.

Arguments:
  • snpobj (SNPObject): The reference SNPObject to compare against.
  • index_by (str, default='pos'): Criteria for matching variants. Options:
Returns:

Tuple containing:

  • list of str: A list of common variant identifiers (as strings).
  • array: An array of indices in self where common variants are located.
  • array: An array of indices in snpobj where common variants are located.
def get_common_markers_intersection( self, snpobj: 'SNPObject') -> Tuple[List[str], numpy.ndarray, numpy.ndarray]:
1041    def get_common_markers_intersection(
1042        self, 
1043        snpobj: 'SNPObject'
1044    ) -> Tuple[List[str], np.ndarray, np.ndarray]:
1045        """
1046        Identify common markers between between `self` and the `snpobj` instance. Common markers are identified 
1047        based on matching chromosome (`variants_chrom`), position (`variants_pos`), reference (`variants_ref`), 
1048        and alternate (`variants_alt`) alleles.
1049
1050        This method returns the identifiers of common markers and their corresponding indices in both objects.
1051        
1052        Args:
1053            snpobj (SNPObject): 
1054                The reference SNPObject to compare against.
1055        
1056        Returns:
1057            Tuple containing:
1058            - **list of str:** A list of common variant identifiers (as strings).
1059            - **array:** An array of indices in `self` where common variants are located.
1060            - **array:** An array of indices in `snpobj` where common variants are located.
1061        """
1062        # Generate unique identifiers based on chrom, pos, ref, and alt alleles
1063        query_identifiers = [
1064            f"{chrom}-{pos}-{ref}-{alt}" for chrom, pos, ref, alt in 
1065            zip(self['variants_chrom'], self['variants_pos'], self['variants_ref'], self['variants_alt'])
1066        ]
1067        reference_identifiers = [
1068            f"{chrom}-{pos}-{ref}-{alt}" for chrom, pos, ref, alt in 
1069            zip(snpobj['variants_chrom'], snpobj['variants_pos'], snpobj['variants_ref'], snpobj['variants_alt'])
1070        ]
1071
1072        # Convert to sets for intersection
1073        common_ids = set(query_identifiers).intersection(reference_identifiers)
1074
1075        # Collect indices for common identifiers in both SNPObjects
1076        query_idx = [i for i, id in enumerate(query_identifiers) if id in common_ids]
1077        reference_idx = [i for i, id in enumerate(reference_identifiers) if id in common_ids]
1078
1079        return list(common_ids), np.array(query_idx), np.array(reference_idx)

Identify common markers between between self and the snpobj instance. Common markers are identified based on matching chromosome (variants_chrom), position (variants_pos), reference (variants_ref), and alternate (variants_alt) alleles.

This method returns the identifiers of common markers and their corresponding indices in both objects.

Arguments:
  • snpobj (SNPObject): The reference SNPObject to compare against.
Returns:

Tuple containing:

  • list of str: A list of common variant identifiers (as strings).
  • array: An array of indices in self where common variants are located.
  • array: An array of indices in snpobj where common variants are located.
def subset_to_common_variants( self, snpobj: 'SNPObject', index_by: str = 'pos', common_variants_intersection: Tuple[numpy.ndarray, numpy.ndarray] | None = None, inplace: bool = False) -> ForwardRef('SNPObject') | None:
1081    def subset_to_common_variants(
1082        self, 
1083        snpobj: 'SNPObject', 
1084        index_by: str = 'pos', 
1085        common_variants_intersection: Optional[Tuple[np.ndarray, np.ndarray]] = None,
1086        inplace: bool = False
1087    ) -> Optional['SNPObject']:
1088        """
1089        Subset `self` to include only the common variants with a reference `snpobj` based on 
1090        the specified `index_by` criterion, which may match based on chromosome and position 
1091        (`variants_chrom`, `variants_pos`), ID (`variants_id`), or both.
1092        
1093        Args:
1094            snpobj (SNPObject): 
1095                The reference SNPObject to compare against.
1096            index_by (str, default='pos'): 
1097                Criteria for matching variants. Options:
1098                - `'pos'`: Matches by chromosome and position (`variants_chrom`, `variants_pos`), e.g., 'chr1-12345'.
1099                - `'id'`: Matches by variant ID alone (`variants_id`), e.g., 'rs123'.
1100                - `'pos+id'`: Matches by chromosome, position, and ID (`variants_chrom`, `variants_pos`, `variants_id`), e.g., 'chr1-12345-rs123'.
1101                Default is 'pos'.
1102            common_variants_intersection (Tuple[np.ndarray, np.ndarray], optional): 
1103                Precomputed indices of common variants between `self` and `snpobj`. If None, intersection is 
1104                computed within the function.
1105            inplace (bool, default=False): 
1106                If True, modifies `self` in place. If False, returns a new `SNPObject` with the common variants
1107                subsetted. Default is False.
1108
1109        Returns:
1110            **Optional[SNPObject]:** 
1111                A new `SNPObject` with the common variants subsetted if `inplace=False`. 
1112                If `inplace=True`, modifies `self` in place and returns None.
1113        """
1114        # Get indices of common variants if not provided
1115        if common_variants_intersection is None:
1116            _, query_idx, _ = self.get_common_variants_intersection(snpobj, index_by=index_by)
1117        else:
1118            query_idx, _ = common_variants_intersection
1119
1120        # Use filter_variants method with the identified indices, applying `inplace` as specified
1121        return self.filter_variants(indexes=query_idx, include=True, inplace=inplace)

Subset self to include only the common variants with a reference snpobj based on the specified index_by criterion, which may match based on chromosome and position (variants_chrom, variants_pos), ID (variants_id), or both.

Arguments:
  • snpobj (SNPObject): The reference SNPObject to compare against.
  • index_by (str, default='pos'): Criteria for matching variants. Options:
  • common_variants_intersection (Tuple[np.ndarray, np.ndarray], optional): Precomputed indices of common variants between self and snpobj. If None, intersection is computed within the function.
  • inplace (bool, default=False): If True, modifies self in place. If False, returns a new SNPObject with the common variants subsetted. Default is False.
Returns:

Optional[SNPObject]: A new SNPObject with the common variants subsetted if inplace=False. If inplace=True, modifies self in place and returns None.

def subset_to_common_markers( self, snpobj: 'SNPObject', common_markers_intersection: Tuple[numpy.ndarray, numpy.ndarray] | None = None, inplace: bool = False) -> ForwardRef('SNPObject') | None:
1123    def subset_to_common_markers(
1124        self, 
1125        snpobj: 'SNPObject', 
1126        common_markers_intersection: Optional[Tuple[np.ndarray, np.ndarray]] = None,
1127        inplace: bool = False
1128    ) -> Optional['SNPObject']:
1129        """
1130        Subset `self` to include only the common markers with a reference `snpobj`. Common markers are identified 
1131        based on matching chromosome (`variants_chrom`), position (`variants_pos`), reference (`variants_ref`), 
1132        and alternate (`variants_alt`) alleles.
1133
1134        Args:
1135            snpobj (SNPObject): 
1136                The reference SNPObject to compare against.
1137            common_markers_intersection (tuple of arrays, optional): 
1138                Precomputed indices of common markers between `self` and `snpobj`. If None, intersection is 
1139                computed within the function.
1140            inplace (bool, default=False): 
1141                If True, modifies `self` in place. If False, returns a new `SNPObject` with the common markers
1142                subsetted. Default is False.
1143
1144        Returns:
1145            **Optional[SNPObject]:** 
1146                A new `SNPObject` with the common markers subsetted if `inplace=False`. 
1147                If `inplace=True`, modifies `self` in place and returns None.
1148        """
1149        # Get indices of common markers if not provided
1150        if common_markers_intersection is None:
1151            _, query_idx, _ = self.get_common_markers_intersection(snpobj)
1152        else:
1153            query_idx, _ = common_markers_intersection
1154
1155        # Use filter_variants method with the identified indices, applying `inplace` as specified
1156        return self.filter_variants(indexes=query_idx, include=True, inplace=inplace)

Subset self to include only the common markers with a reference snpobj. Common markers are identified based on matching chromosome (variants_chrom), position (variants_pos), reference (variants_ref), and alternate (variants_alt) alleles.

Arguments:
  • snpobj (SNPObject): The reference SNPObject to compare against.
  • common_markers_intersection (tuple of arrays, optional): Precomputed indices of common markers between self and snpobj. If None, intersection is computed within the function.
  • inplace (bool, default=False): If True, modifies self in place. If False, returns a new SNPObject with the common markers subsetted. Default is False.
Returns:

Optional[SNPObject]: A new SNPObject with the common markers subsetted if inplace=False. If inplace=True, modifies self in place and returns None.

def merge( self, snpobj: 'SNPObject', force_samples: bool = False, prefix: str = '2', inplace: bool = False) -> ForwardRef('SNPObject') | None:
1158    def merge(
1159            self, 
1160            snpobj: 'SNPObject', 
1161            force_samples: bool = False, 
1162            prefix: str = '2', 
1163            inplace: bool = False
1164        ) -> Optional['SNPObject']:
1165        """
1166        Merge `self` with `snpobj` along the sample axis.
1167
1168        This method expects both SNPObjects to contain the same set of SNPs in the same order, 
1169        then combines their genotype (`calldata_gt`) and LAI (`calldata_lai`) arrays by 
1170        concatenating the sample dimension. Samples from `snpobj` are appended to those in `self`.
1171
1172        Args:
1173            snpobj (SNPObject): 
1174                The SNPObject to merge samples with.
1175            force_samples (bool, default=False): 
1176                If True, duplicate sample names are resolved by prepending the `prefix` to duplicate sample names in 
1177                `snpobj`. Otherwise, merging fails when duplicate sample names are found. Default is False.
1178            prefix (str, default='2'): 
1179                A string prepended to duplicate sample names in `snpobj` when `force_samples=True`. 
1180                Duplicates are renamed from `<sample_name>` to `<prefix>:<sample_name>`. For instance, 
1181                if `prefix='2'` and there is a conflict with a sample called "sample_1", it becomes "2:sample_1".
1182            inplace (bool, default=False): 
1183                If True, modifies `self` in place. If False, returns a new `SNPObject` with the merged samples. 
1184                Default is False.
1185
1186        Returns:
1187            **Optional[SNPObject]**: A new SNPObject containing the merged sample data.
1188        """
1189        # Merge calldata_gt if present and compatible
1190        if self.calldata_gt is not None and snpobj.calldata_gt is not None:
1191            if self.calldata_gt.shape[0] != snpobj.calldata_gt.shape[0]:
1192                raise ValueError(
1193                    f"Cannot merge SNPObjects: Mismatch in the number of SNPs in `calldata_gt`.\n"
1194                    f"`self.calldata_gt` has {self.calldata_gt.shape[0]} SNPs, "
1195                    f"while `snpobj.calldata_gt` has {snpobj.calldata_gt.shape[0]} SNPs."
1196                )
1197            if self.are_strands_summed and not snpobj.are_strands_summed:
1198                raise ValueError(
1199                    "Cannot merge SNPObjects: `self` has summed strands, but `snpobj` does not.\n"
1200                    "Ensure both objects have the same genotype summation state before merging."
1201                )
1202            if not self.are_strands_summed and snpobj.are_strands_summed:
1203                raise ValueError(
1204                    "Cannot merge SNPObjects: `snpobj` has summed strands, but `self` does not.\n"
1205                    "Ensure both objects have the same genotype summation state before merging."
1206                )
1207            calldata_gt = np.concatenate([self.calldata_gt, snpobj.calldata_gt], axis=1)
1208        else:
1209            calldata_gt = None
1210
1211        # Merge samples if present and compatible, handling duplicates if `force_samples=True`
1212        if self.samples is not None and snpobj.samples is not None:
1213            overlapping_samples = set(self.samples).intersection(set(snpobj.samples))
1214            if overlapping_samples:
1215                if not force_samples:
1216                    raise ValueError(
1217                        f"Cannot merge SNPObjects: Found overlapping sample names {overlapping_samples}.\n"
1218                        "Samples must be strictly non-overlapping. To allow merging with renaming, set `force_samples=True`."
1219                    )
1220                else:
1221                    # Rename duplicate samples by prepending the file index
1222                    renamed_samples = [f"{prefix}:{sample}" if sample in overlapping_samples else sample for sample in snpobj.samples]
1223                    samples = np.concatenate([self.samples, renamed_samples], axis=0)
1224            else:
1225                samples = np.concatenate([self.samples, snpobj.samples], axis=0)
1226        else:
1227            samples = None
1228
1229        # Merge LAI data if present and compatible
1230        if self.calldata_lai is not None and snpobj.calldata_lai is not None:
1231            if self.calldata_lai.ndim != snpobj.calldata_lai.ndim:
1232                raise ValueError(
1233                    f"Cannot merge SNPObjects: Mismatch in `calldata_lai` dimensions.\n"
1234                    f"`self.calldata_lai` has {self.calldata_lai.ndim} dimensions, "
1235                    f"while `snpobj.calldata_lai` has {snpobj.calldata_lai.ndim} dimensions."
1236                )
1237            if self.calldata_lai.shape[0] != snpobj.calldata_lai.shape[0]:
1238                raise ValueError(
1239                    f"Cannot merge SNPObjects: Mismatch in the number of SNPs in `calldata_lai`.\n"
1240                    f"`self.calldata_lai` has {self.calldata_lai.shape[0]} SNPs, "
1241                    f"while `snpobj.calldata_lai` has {snpobj.calldata_lai.shape[0]} SNPs."
1242                )
1243            calldata_lai = np.concatenate([self.calldata_lai, snpobj.calldata_lai], axis=1)
1244        else:
1245            calldata_lai = None
1246
1247        if inplace:
1248            self.calldata_gt = calldata_gt
1249            self.calldata_lai = calldata_lai
1250            self.samples = samples
1251            return self
1252
1253        # Create and return a new SNPObject containing the merged samples
1254        return SNPObject(
1255            calldata_gt=calldata_gt,
1256            samples=samples,
1257            variants_ref=self.variants_ref,
1258            variants_alt=self.variants_alt,
1259            variants_chrom=self.variants_chrom,
1260            variants_filter_pass=self.variants_filter_pass,
1261            variants_id=self.variants_id,
1262            variants_pos=self.variants_pos,
1263            variants_qual=self.variants_qual,
1264            calldata_lai=calldata_lai,
1265            ancestry_map=self.ancestry_map
1266        )

Merge self with snpobj along the sample axis.

This method expects both SNPObjects to contain the same set of SNPs in the same order, then combines their genotype (calldata_gt) and LAI (calldata_lai) arrays by concatenating the sample dimension. Samples from snpobj are appended to those in self.

Arguments:
  • snpobj (SNPObject): The SNPObject to merge samples with.
  • force_samples (bool, default=False): If True, duplicate sample names are resolved by prepending the prefix to duplicate sample names in snpobj. Otherwise, merging fails when duplicate sample names are found. Default is False.
  • prefix (str, default='2'): A string prepended to duplicate sample names in snpobj when force_samples=True. Duplicates are renamed from <sample_name> to <prefix>:<sample_name>. For instance, if prefix='2' and there is a conflict with a sample called "sample_1", it becomes "2:sample_1".
  • inplace (bool, default=False): If True, modifies self in place. If False, returns a new SNPObject with the merged samples. Default is False.
Returns:

Optional[SNPObject]: A new SNPObject containing the merged sample data.

def concat( self, snpobj: 'SNPObject', inplace: bool = False) -> ForwardRef('SNPObject') | None:
1268    def concat(
1269        self,
1270        snpobj: 'SNPObject', 
1271        inplace: bool = False
1272    ) -> Optional['SNPObject']:
1273        """
1274        Concatenate self with snpobj along the SNP axis.
1275
1276        This method expects both SNPObjects to contain the same set of samples in the same order, 
1277        and that the chromosome(s) in snpobj follow (i.e. have higher numeric identifiers than) 
1278        those in self.
1279
1280        Args:
1281            snpobj (SNPObject):
1282                The SNPObject to concatenate SNPs with.
1283            inplace (bool, default=False):
1284                If True, modifies `self` in place. If False, returns a new `SNPObject` with the concatenated SNPs. 
1285                Default is False.
1286        
1287        Returns:
1288            **Optional[SNPObject]**: A new SNPObject containing the concatenated SNP data.
1289        """
1290        # Merge calldata_gt if present and compatible
1291        if self.calldata_gt is not None and snpobj.calldata_gt is not None:
1292            if self.calldata_gt.shape[1] != snpobj.calldata_gt.shape[1]:
1293                raise ValueError(
1294                    f"Cannot merge SNPObjects: Mismatch in the number of samples in `calldata_gt`.\n"
1295                    f"`self.calldata_gt` has {self.calldata_gt.shape[1]} samples, "
1296                    f"while `snpobj.calldata_gt` has {snpobj.calldata_gt.shape[1]} samples."
1297                )
1298            if self.are_strands_summed and not snpobj.are_strands_summed:
1299                raise ValueError(
1300                    "Cannot merge SNPObjects: `self` has summed strands, but `snpobj` does not.\n"
1301                    "Ensure both objects have the same genotype summation state before merging."
1302                )
1303            if not self.are_strands_summed and snpobj.are_strands_summed:
1304                raise ValueError(
1305                    "Cannot merge SNPObjects: `snpobj` has summed strands, but `self` does not.\n"
1306                    "Ensure both objects have the same genotype summation state before merging."
1307                )
1308            calldata_gt = np.concatenate([self.calldata_gt, snpobj.calldata_gt], axis=0)
1309        else:
1310            calldata_gt = None
1311
1312        # Merge SNP-related attributes if present
1313        attributes = [
1314            'variants_ref', 'variants_alt', 'variants_chrom', 'variants_filter_pass', 'variants_id', 'variants_pos', 'variants_qual'
1315        ]
1316        merged_attrs = {}
1317        for attr in attributes:
1318            self_attr = getattr(self, attr, None)
1319            obj_attr = getattr(snpobj, attr, None)
1320
1321            # Concatenate if both present
1322            if self_attr is not None and obj_attr is not None:
1323                merged_attrs[attr] = np.concatenate([self_attr, obj_attr], axis=0)
1324            else:
1325                # If either is None, store None
1326                merged_attrs[attr] = None
1327
1328        # Merge LAI data if present and compatible
1329        if self.calldata_lai is not None and snpobj.calldata_lai is not None:
1330            if self.calldata_lai.ndim != snpobj.calldata_lai.ndim:
1331                raise ValueError(
1332                    f"Cannot merge SNPObjects: Mismatch in `calldata_lai` dimensions.\n"
1333                    f"`self.calldata_lai` has {self.calldata_lai.ndim} dimensions, "
1334                    f"while `snpobj.calldata_lai` has {snpobj.calldata_lai.ndim} dimensions."
1335                )
1336            if self.calldata_lai.shape[1] != snpobj.calldata_lai.shape[1]:
1337                raise ValueError(
1338                    f"Cannot merge SNPObjects: Mismatch in the number of samples in `calldata_lai`.\n"
1339                    f"`self.calldata_lai` has {self.calldata_lai.shape[1]} samples, "
1340                    f"while `snpobj.calldata_lai` has {snpobj.calldata_lai.shape[1]} samples."
1341                )
1342            calldata_lai = np.concatenate([self.calldata_lai, snpobj.calldata_lai], axis=0)
1343        else:
1344            calldata_lai = None
1345        
1346        if inplace:
1347            self.calldata_gt = calldata_gt
1348            self.calldata_lai = calldata_lai
1349            for attr in attributes:
1350                self[attr] = merged_attrs[attr]
1351            return self
1352        
1353        # Create and return a new SNPObject containing the concatenated SNPs
1354        return SNPObject(
1355            calldata_gt=calldata_gt,
1356            calldata_lai=calldata_lai,
1357            samples=self.samples,
1358            variants_ref=merged_attrs['variants_ref'],
1359            variants_alt=merged_attrs['variants_alt'],
1360            variants_chrom=merged_attrs['variants_chrom'],
1361            variants_id=merged_attrs['variants_id'],
1362            variants_pos=merged_attrs['variants_pos'],
1363            variants_qual=merged_attrs['variants_qual'],
1364            variants_filter_pass=merged_attrs['variants_filter_pass'],
1365            ancestry_map=self.ancestry_map
1366        )

Concatenate self with snpobj along the SNP axis.

This method expects both SNPObjects to contain the same set of samples in the same order, and that the chromosome(s) in snpobj follow (i.e. have higher numeric identifiers than) those in self.

Arguments:
  • snpobj (SNPObject): The SNPObject to concatenate SNPs with.
  • inplace (bool, default=False): If True, modifies self in place. If False, returns a new SNPObject with the concatenated SNPs. Default is False.
Returns:

Optional[SNPObject]: A new SNPObject containing the concatenated SNP data.

def remove_strand_ambiguous_variants(self, inplace: bool = False) -> ForwardRef('SNPObject') | None:
1368    def remove_strand_ambiguous_variants(self, inplace: bool = False) -> Optional['SNPObject']:
1369        """
1370        A strand-ambiguous variant has reference (`variants_ref`) and alternate (`variants_alt`) alleles 
1371        in the pairs A/T, T/A, C/G, or G/C, where both alleles are complementary and thus indistinguishable 
1372        in terms of strand orientation.
1373
1374        Args:
1375            inplace (bool, default=False): 
1376                If True, modifies `self` in place. If False, returns a new `SNPObject` with the 
1377                strand-ambiguous variants removed. Default is False.
1378
1379        Returns:
1380            **Optional[SNPObject]:** A new `SNPObject` with non-ambiguous variants only if `inplace=False`. 
1381            If `inplace=True`, modifies `self` in place and returns None.
1382        """
1383        # Identify strand-ambiguous SNPs using vectorized comparisons
1384        is_AT = (self['variants_ref'] == 'A') & (self['variants_alt'] == 'T')
1385        is_TA = (self['variants_ref'] == 'T') & (self['variants_alt'] == 'A')
1386        is_CG = (self['variants_ref'] == 'C') & (self['variants_alt'] == 'G')
1387        is_GC = (self['variants_ref'] == 'G') & (self['variants_alt'] == 'C')
1388
1389        # Create a combined mask for all ambiguous variants
1390        ambiguous_mask = is_AT | is_TA | is_CG | is_GC
1391        non_ambiguous_idx = np.where(~ambiguous_mask)[0]
1392
1393        # Count each type of ambiguity using numpy's sum on boolean arrays
1394        A_T_count = np.sum(is_AT)
1395        T_A_count = np.sum(is_TA)
1396        C_G_count = np.sum(is_CG)
1397        G_C_count = np.sum(is_GC)
1398
1399        # Log the counts of each type of strand-ambiguous variants
1400        total_ambiguous = A_T_count + T_A_count + C_G_count + G_C_count
1401        log.info(f'{A_T_count} ambiguities of A-T type.')
1402        log.info(f'{T_A_count} ambiguities of T-A type.')
1403        log.info(f'{C_G_count} ambiguities of C-G type.')
1404        log.info(f'{G_C_count} ambiguities of G-C type.')
1405
1406        # Filter out ambiguous variants and keep non-ambiguous ones
1407        log.debug(f'Removing {total_ambiguous} strand-ambiguous variants...')
1408        return self.filter_variants(indexes=non_ambiguous_idx, include=True, inplace=inplace)

A strand-ambiguous variant has reference (variants_ref) and alternate (variants_alt) alleles in the pairs A/T, T/A, C/G, or G/C, where both alleles are complementary and thus indistinguishable in terms of strand orientation.

Arguments:
  • inplace (bool, default=False): If True, modifies self in place. If False, returns a new SNPObject with the strand-ambiguous variants removed. Default is False.
Returns:

Optional[SNPObject]: A new SNPObject with non-ambiguous variants only if inplace=False. If inplace=True, modifies self in place and returns None.

def correct_flipped_variants( self, snpobj: 'SNPObject', check_complement: bool = True, index_by: str = 'pos', common_variants_intersection: Tuple[numpy.ndarray, numpy.ndarray] | None = None, log_stats: bool = True, inplace: bool = False) -> ForwardRef('SNPObject') | None:
1410    def correct_flipped_variants(
1411        self, 
1412        snpobj: 'SNPObject', 
1413        check_complement: bool = True, 
1414        index_by: str = 'pos', 
1415        common_variants_intersection: Optional[Tuple[np.ndarray, np.ndarray]] = None,
1416        log_stats: bool = True,
1417        inplace: bool = False
1418    ) -> Optional['SNPObject']:
1419        """
1420        Correct flipped variants between between `self` and a reference `snpobj`, where reference (`variants_ref`) 
1421        and alternate (`variants_alt`) alleles are swapped.
1422
1423        **Flip Detection Based on `check_complement`:**
1424
1425        - If `check_complement=False`, only direct allele swaps are considered:
1426            1. **Direct Swap:** `self.variants_ref == snpobj.variants_alt` and `self.variants_alt == snpobj.variants_ref`.
1427
1428        - If `check_complement=True`, both direct and complementary swaps are considered, with four possible cases:
1429            1. **Direct Swap:** `self.variants_ref == snpobj.variants_alt` and `self.variants_alt == snpobj.variants_ref`.
1430            2. **Complement Swap of Ref:** `complement(self.variants_ref) == snpobj.variants_alt` and `self.variants_alt == snpobj.variants_ref`.
1431            3. **Complement Swap of Alt:** `self.variants_ref == snpobj.variants_alt` and `complement(self.variants_alt) == snpobj.variants_ref`.
1432            4. **Complement Swap of both Ref and Alt:** `complement(self.variants_ref) == snpobj.variants_alt` and `complement(self.variants_alt) == snpobj.variants_ref`.
1433
1434        **Note:** Variants where `self.variants_ref == self.variants_alt` are ignored as they are ambiguous.
1435
1436        **Correction Process:** 
1437        - Swaps `variants_ref` and `variants_alt` alleles in `self` to align with `snpobj`.
1438        - Flips `calldata_gt` values (0 becomes 1, and 1 becomes 0) to match the updated allele configuration.
1439
1440        Args:
1441            snpobj (SNPObject): 
1442                The reference SNPObject to compare against.
1443            check_complement (bool, default=True): 
1444                If True, also checks for complementary base pairs (A/T, T/A, C/G, and G/C) when identifying swapped variants.
1445                Default is True.
1446            index_by (str, default='pos'): 
1447                Criteria for matching variants. Options:
1448                - `'pos'`: Matches by chromosome and position (`variants_chrom`, `variants_pos`), e.g., 'chr1-12345'.
1449                - `'id'`: Matches by variant ID alone (`variants_id`), e.g., 'rs123'.
1450                - `'pos+id'`: Matches by chromosome, position, and ID (`variants_chrom`, `variants_pos`, `variants_id`), e.g., 'chr1-12345-rs123'.
1451                Default is 'pos'.
1452            common_variants_intersection (tuple of arrays, optional): 
1453                Precomputed indices of common variants between `self` and `snpobj`. If None, intersection is 
1454                computed within the function.
1455            log_stats (bool, default=True): 
1456                If True, logs statistical information about matching and ambiguous alleles. Default is True.
1457            inplace (bool, default=False): 
1458                If True, modifies `self` in place. If False, returns a new `SNPObject` with corrected 
1459                flips. Default is False.
1460
1461        Returns:
1462            **Optional[SNPObject]**: 
1463                A new `SNPObject` with corrected flips if `inplace=False`. 
1464                If `inplace=True`, modifies `self` in place and returns None.
1465        """
1466        # Define complement mappings for nucleotides
1467        complement_map = {'A': 'T', 'T': 'A', 'C': 'G', 'G': 'C'}
1468
1469        # Helper function to get the complement of a base
1470        def get_complement(base: str) -> str:
1471            return complement_map.get(base, base)
1472
1473        # Get common variant indices if not provided
1474        if common_variants_intersection != None:
1475            query_idx, reference_idx = common_variants_intersection
1476        else:
1477            _, query_idx, reference_idx = self.get_common_variants_intersection(snpobj, index_by=index_by)
1478
1479        # Log statistics on matching alleles if enabled
1480        if log_stats:
1481            matching_ref = np.sum(self['variants_ref'][query_idx] == snpobj['variants_ref'][reference_idx])
1482            matching_alt = np.sum(self['variants_alt'][query_idx] == snpobj['variants_alt'][reference_idx])
1483            ambiguous = np.sum(self['variants_ref'][query_idx] == self['variants_alt'][query_idx])
1484            log.info(f"Matching reference alleles (ref=ref'): {matching_ref}, Matching alternate alleles (alt=alt'): {matching_alt}.")
1485            log.info(f"Number of ambiguous alleles (ref=alt): {ambiguous}.")
1486
1487        # Identify indices where `ref` and `alt` alleles are swapped
1488        if not check_complement:
1489            # Simple exact match for swapped alleles
1490            swapped_ref = (self['variants_ref'][query_idx] == snpobj['variants_alt'][reference_idx])
1491            swapped_alt = (self['variants_alt'][query_idx] == snpobj['variants_ref'][reference_idx])
1492        else:
1493            # Check for swapped or complementary-swapped alleles
1494            swapped_ref = (
1495                (self['variants_ref'][query_idx] == snpobj['variants_alt'][reference_idx]) |
1496                (np.vectorize(get_complement)(self['variants_ref'][query_idx]) == snpobj['variants_alt'][reference_idx])
1497            )
1498            swapped_alt = (
1499                (self['variants_alt'][query_idx] == snpobj['variants_ref'][reference_idx]) |
1500                (np.vectorize(get_complement)(self['variants_alt'][query_idx]) == snpobj['variants_ref'][reference_idx])
1501            )
1502
1503        # Filter out ambiguous variants where `ref` and `alt` alleles match (ref=alt)
1504        not_ambiguous = (self['variants_ref'][query_idx] != self['variants_alt'][query_idx])
1505
1506        # Indices in `self` of flipped variants
1507        flip_idx_query = query_idx[swapped_ref & swapped_alt & not_ambiguous]
1508
1509        # Correct the identified variant flips
1510        if len(flip_idx_query) > 0:
1511            log.info(f'Correcting {len(flip_idx_query)} variant flips...')
1512
1513            temp_alts = self['variants_alt'][flip_idx_query]
1514            temp_refs = self['variants_ref'][flip_idx_query]
1515
1516            # Correct the variant flips based on whether the operation is in-place or not
1517            if inplace:
1518                self['variants_alt'][flip_idx_query] = temp_refs
1519                self['variants_ref'][flip_idx_query] = temp_alts
1520                self['calldata_gt'][flip_idx_query] = 1 - self['calldata_gt'][flip_idx_query]
1521                return None
1522            else:
1523                snpobj = self.copy()
1524                snpobj['variants_alt'][flip_idx_query] = temp_refs
1525                snpobj['variants_ref'][flip_idx_query] = temp_alts
1526                snpobj['calldata_gt'][flip_idx_query] = 1 - snpobj['calldata_gt'][flip_idx_query]
1527                return snpobj
1528        else:
1529            log.info('No variant flips found to correct.')
1530            return self if not inplace else None

Correct flipped variants between between self and a reference snpobj, where reference (variants_ref) and alternate (variants_alt) alleles are swapped.

Flip Detection Based on check_complement:

  • If check_complement=False, only direct allele swaps are considered:

    1. Direct Swap: self.variants_ref == snpobj.variants_alt and self.variants_alt == snpobj.variants_ref.
  • If check_complement=True, both direct and complementary swaps are considered, with four possible cases:

    1. Direct Swap: self.variants_ref == snpobj.variants_alt and self.variants_alt == snpobj.variants_ref.
    2. Complement Swap of Ref: complement(self.variants_ref) == snpobj.variants_alt and self.variants_alt == snpobj.variants_ref.
    3. Complement Swap of Alt: self.variants_ref == snpobj.variants_alt and complement(self.variants_alt) == snpobj.variants_ref.
    4. Complement Swap of both Ref and Alt: complement(self.variants_ref) == snpobj.variants_alt and complement(self.variants_alt) == snpobj.variants_ref.

Note: Variants where self.variants_ref == self.variants_alt are ignored as they are ambiguous.

Correction Process:

Arguments:
  • snpobj (SNPObject): The reference SNPObject to compare against.
  • check_complement (bool, default=True): If True, also checks for complementary base pairs (A/T, T/A, C/G, and G/C) when identifying swapped variants. Default is True.
  • index_by (str, default='pos'): Criteria for matching variants. Options:
  • common_variants_intersection (tuple of arrays, optional): Precomputed indices of common variants between self and snpobj. If None, intersection is computed within the function.
  • log_stats (bool, default=True): If True, logs statistical information about matching and ambiguous alleles. Default is True.
  • inplace (bool, default=False): If True, modifies self in place. If False, returns a new SNPObject with corrected flips. Default is False.
Returns:

Optional[SNPObject]: A new SNPObject with corrected flips if inplace=False. If inplace=True, modifies self in place and returns None.

def remove_mismatching_variants( self, snpobj: 'SNPObject', index_by: str = 'pos', common_variants_intersection: Tuple[numpy.ndarray, numpy.ndarray] | None = None, inplace: bool = False) -> ForwardRef('SNPObject') | None:
1532    def remove_mismatching_variants(
1533        self, 
1534        snpobj: 'SNPObject', 
1535        index_by: str = 'pos', 
1536        common_variants_intersection: Optional[Tuple[np.ndarray, np.ndarray]] = None,
1537        inplace: bool = False
1538    ) -> Optional['SNPObject']:
1539        """
1540        Remove variants from `self`, where reference (`variants_ref`) and/or alternate (`variants_alt`) alleles 
1541        do not match with a reference `snpobj`.
1542
1543        Args:
1544            snpobj (SNPObject): 
1545                The reference SNPObject to compare against.
1546            index_by (str, default='pos'): 
1547                Criteria for matching variants. Options:
1548                - `'pos'`: Matches by chromosome and position (`variants_chrom`, `variants_pos`), e.g., 'chr1-12345'.
1549                - `'id'`: Matches by variant ID alone (`variants_id`), e.g., 'rs123'.
1550                - `'pos+id'`: Matches by chromosome, position, and ID (`variants_chrom`, `variants_pos`, `variants_id`), e.g., 'chr1-12345-rs123'.
1551                Default is 'pos'.
1552            common_variants_intersection (tuple of arrays, optional): 
1553                Precomputed indices of common variants between `self` and the reference `snpobj`.
1554                If None, the intersection is computed within the function.
1555            inplace (bool, default=False): 
1556                If True, modifies `self` in place. If False, returns a new `SNPObject` without 
1557                mismatching variants. Default is False.
1558
1559        Returns:
1560            **Optional[SNPObject]:** 
1561                A new `SNPObject` without mismatching variants if `inplace=False`. 
1562                If `inplace=True`, modifies `self` in place and returns None.
1563        """
1564        # Get common variant indices if not provided
1565        if common_variants_intersection is not None:
1566            query_idx, reference_idx = common_variants_intersection
1567        else:
1568            _, query_idx, reference_idx = self.get_common_variants_intersection(snpobj, index_by=index_by)
1569
1570        # Vectorized comparison of `ref` and `alt` alleles
1571        ref_mismatch = self['variants_ref'][query_idx] != snpobj['variants_ref'][reference_idx]
1572        alt_mismatch = self['variants_alt'][query_idx] != snpobj['variants_alt'][reference_idx]
1573        mismatch_mask = ref_mismatch | alt_mismatch
1574
1575        # Identify indices in `self` of mismatching variants
1576        mismatch_idx = query_idx[mismatch_mask]
1577
1578        # Compute total number of variant mismatches
1579        total_mismatches = np.sum(mismatch_mask)
1580
1581        # Filter out mismatching variants
1582        log.debug(f'Removing {total_mismatches} mismatching variants...')
1583        return self.filter_variants(indexes=mismatch_idx, include=True, inplace=inplace)

Remove variants from self, where reference (variants_ref) and/or alternate (variants_alt) alleles do not match with a reference snpobj.

Arguments:
  • snpobj (SNPObject): The reference SNPObject to compare against.
  • index_by (str, default='pos'): Criteria for matching variants. Options:
  • common_variants_intersection (tuple of arrays, optional): Precomputed indices of common variants between self and the reference snpobj. If None, the intersection is computed within the function.
  • inplace (bool, default=False): If True, modifies self in place. If False, returns a new SNPObject without mismatching variants. Default is False.
Returns:

Optional[SNPObject]: A new SNPObject without mismatching variants if inplace=False. If inplace=True, modifies self in place and returns None.

def shuffle_variants(self, inplace: bool = False) -> ForwardRef('SNPObject') | None:
1585    def shuffle_variants(self, inplace: bool = False) -> Optional['SNPObject']:
1586        """
1587        Randomly shuffle the positions of variants in the SNPObject, ensuring that all associated 
1588        data (e.g., `calldata_gt` and variant-specific attributes) remain aligned.
1589
1590        Args:
1591            inplace (bool, default=False): 
1592                If True, modifies `self` in place. If False, returns a new `SNPObject` with 
1593                shuffled variants. Default is False.
1594
1595        Returns:
1596            **Optional[SNPObject]:** 
1597                A new `SNPObject` without shuffled variant positions if `inplace=False`. 
1598                If `inplace=True`, modifies `self` in place and returns None.
1599        """
1600        # Generate a random permutation index for shuffling variant positions
1601        shuffle_index = np.random.permutation(self.n_snps)
1602
1603        # Apply shuffling to all relevant attributes using the class's dictionary-like interface
1604        if inplace:
1605            for key in self.keys():
1606                if self[key] is not None:
1607                    if key == 'calldata_gt':
1608                        # `calldata_gt`` has a different shape, so it's shuffled along axis 0
1609                        self[key] = self[key][shuffle_index, ...]
1610                    elif 'variant' in key:
1611                        # snpobj attributes are 1D arrays
1612                        self[key] = np.asarray(self[key])[shuffle_index]
1613            return None
1614        else:
1615            shuffled_snpobj = self.copy()
1616            for key in shuffled_snpobj.keys():
1617                if shuffled_snpobj[key] is not None:
1618                    if key == 'calldata_gt':
1619                        shuffled_snpobj[key] = shuffled_snpobj[key][shuffle_index, ...]
1620                    elif 'variant' in key:
1621                        shuffled_snpobj[key] = np.asarray(shuffled_snpobj[key])[shuffle_index]
1622            return shuffled_snpobj

Randomly shuffle the positions of variants in the SNPObject, ensuring that all associated data (e.g., calldata_gt and variant-specific attributes) remain aligned.

Arguments:
  • inplace (bool, default=False): If True, modifies self in place. If False, returns a new SNPObject with shuffled variants. Default is False.
Returns:

Optional[SNPObject]: A new SNPObject without shuffled variant positions if inplace=False. If inplace=True, modifies self in place and returns None.

def set_empty_to_missing(self, inplace: bool = False) -> ForwardRef('SNPObject') | None:
1624    def set_empty_to_missing(self, inplace: bool = False) -> Optional['SNPObject']:
1625        """
1626        Replace empty strings `''` with missing values `'.'` in attributes of `self`.
1627
1628        Args:
1629            inplace (bool, default=False): 
1630                If True, modifies `self` in place. If False, returns a new `SNPObject` with empty 
1631                strings `''` replaced by missing values `'.'`. Default is False.
1632
1633        Returns:
1634            **Optional[SNPObject]:** 
1635                A new `SNPObject` with empty strings replaced if `inplace=False`. 
1636                If `inplace=True`, modifies `self` in place and returns None.
1637        """
1638        if inplace:
1639            if self.variants_alt is not None:
1640                self.variants_alt[self.variants_alt == ''] = '.'
1641            if self.variants_ref is not None:
1642                self.variants_ref[self.variants_ref == ''] = '.'
1643            if self.variants_qual is not None:
1644                self.variants_qual = self.variants_qual.astype(str)
1645                self.variants_qual[(self.variants_qual == '') | (self.variants_qual == 'nan')] = '.'
1646            if self.variants_chrom is not None:
1647                self.variants_chrom = self.variants_chrom.astype(str)
1648                self.variants_chrom[self.variants_chrom == ''] = '.'
1649            if self.variants_filter_pass is not None:
1650                self.variants_filter_pass[self.variants_filter_pass == ''] = '.'
1651            if self.variants_id is not None:
1652                self.variants_id[self.variants_id == ''] = '.'
1653            return self
1654        else:
1655            snpobj = self.copy()
1656            if snpobj.variants_alt is not None:
1657                snpobj.variants_alt[snpobj.variants_alt == ''] = '.'
1658            if snpobj.variants_ref is not None:
1659                snpobj.variants_ref[snpobj.variants_ref == ''] = '.'
1660            if snpobj.variants_qual is not None:
1661                snpobj.variants_qual = snpobj.variants_qual.astype(str)
1662                snpobj.variants_qual[(snpobj.variants_qual == '') | (snpobj.variants_qual == 'nan')] = '.'
1663            if snpobj.variants_chrom is not None:
1664                snpobj.variants_chrom[snpobj.variants_chrom == ''] = '.'
1665            if snpobj.variants_filter_pass is not None:
1666                snpobj.variants_filter_pass[snpobj.variants_filter_pass == ''] = '.'
1667            if snpobj.variants_id is not None:
1668                snpobj.variants_id[snpobj.variants_id == ''] = '.'
1669            return snpobj

Replace empty strings '' with missing values '.' in attributes of self.

Arguments:
  • inplace (bool, default=False): If True, modifies self in place. If False, returns a new SNPObject with empty strings '' replaced by missing values '.'. Default is False.
Returns:

Optional[SNPObject]: A new SNPObject with empty strings replaced if inplace=False. If inplace=True, modifies self in place and returns None.

def convert_to_window_level( self, window_size: int | None = None, physical_pos: numpy.ndarray | None = None, chromosomes: numpy.ndarray | None = None, window_sizes: numpy.ndarray | None = None, laiobj: ForwardRef('LocalAncestryObject') | None = None) -> 'LocalAncestryObject':
1671    def convert_to_window_level(
1672        self,
1673        window_size: Optional[int] = None,
1674        physical_pos: Optional[np.ndarray] = None,
1675        chromosomes: Optional[np.ndarray] = None,
1676        window_sizes: Optional[np.ndarray] = None,
1677        laiobj: Optional['LocalAncestryObject'] = None
1678    ) -> 'LocalAncestryObject':
1679        """
1680        Aggregate the `calldata_lai` attribute into genomic windows within a 
1681        `snputils.ancestry.genobj.LocalAncestryObject`.
1682
1683        **Options for defining windows (in order of precedence):**
1684
1685        1. **Fixed window size**:
1686        - Use `window_size` to specify how many SNPs go into each window. The last window on each 
1687        chromosome may be larger if SNPs are not evenly divisible by the size.
1688
1689        2. **Custom start and end positions**:
1690        - Provide `physical_pos` (2D array of shape (n_windows, 2)) as the [start, end] base-pair 
1691         coordinates for each window. 
1692        - If `chromosomes` is not provided and `self` has exactly one chromosome, all windows are 
1693        assumed to belong to that chromosome. 
1694        - If multiple chromosomes exist but `chromosomes` is missing, an error will be raised.
1695        - Optionally, provide `window_sizes` to store the SNP count per-window.
1696
1697        3. **Matching existing windows**:
1698        - Reuse window definitions (`physical_pos`, `chromosomes`, `window_sizes`) from an existing `laiobj`.
1699
1700        Args:
1701            window_size (int, optional): 
1702                Number of SNPs in each window if defining fixed-size windows. If the total number of 
1703                SNPs in a chromosome is not evenly divisible by the window size, the last window on that 
1704                chromosome will include all remaining SNPs and therefore be larger than the specified size.
1705            physical_pos (array of shape (n_windows, 2), optional): 
1706                A 2D array containing the start and end physical positions for each window.
1707            chromosomes (array of shape (n_windows,), optional): 
1708                An array with chromosome numbers corresponding to each genomic window.
1709            window_sizes (array of shape (n_windows,), optional): 
1710                An array specifying the number of SNPs in each genomic window.
1711            laiobj (LocalAncestryObject, optional): 
1712                A reference `LocalAncestryObject` from which to copy existing window definitions.
1713
1714        Returns:
1715            **LocalAncestryObject:** 
1716                A LocalAncestryObject containing window-level ancestry data.
1717        """
1718        from snputils.ancestry.genobj.local import LocalAncestryObject
1719
1720        if window_size is None and physical_pos is None and laiobj is None:
1721            raise ValueError("One of `window_size`, `physical_pos`, or `laiobj` must be provided.")
1722        
1723        # Fixed window size
1724        if window_size is not None:
1725            physical_pos = []   # Boundaries [start, end] of each window
1726            chromosomes = []    # Chromosome for each window
1727            window_sizes = []   # Number of SNPs for each window
1728            for chrom in self.unique_chrom:
1729                # Extract indices corresponding to this chromosome
1730                mask_chrom = (self.variants_chrom == chrom)
1731                # Subset to this chromosome
1732                pos_chrom = self.variants_pos[mask_chrom]
1733                # Number of SNPs for this chromosome
1734                n_snps_chrom = pos_chrom.size
1735                
1736                # Initialize the start of the first window with the position of the first SNP
1737                current_start = self.variants_pos[0]
1738
1739                # Number of full windows with exactly `window_size` SNPs
1740                n_full_windows = n_snps_chrom // window_size
1741
1742                # Build all but the last window
1743                for i in range(n_full_windows-1):
1744                    current_end = self.variants_pos[(i+1) * window_size - 1]
1745                    physical_pos.append([current_start, current_end])
1746                    chromosomes.append(chrom)
1747                    window_sizes.append(window_size)
1748                    current_start = self.variants_pos[(i+1) * window_size]
1749                
1750                # Build the last window
1751                current_end = self.variants_pos[-1]
1752                physical_pos.append([current_start, current_end])
1753                chromosomes.append(chrom)
1754                window_sizes.append(n_snps_chrom - ((n_full_windows - 1) * window_size))
1755                
1756            physical_pos = np.array(physical_pos)
1757            chromosomes = np.array(chromosomes)
1758            window_sizes = np.array(window_sizes)
1759        
1760        # Custom start and end positions
1761        elif physical_pos is not None:
1762            # Check if there is exactly one chromosome
1763            if chromosomes is None:
1764                unique_chrom = self.unique_chrom
1765                if len(unique_chrom) == 1:
1766                    # We assume all windows belong to this single chromosome
1767                    single_chrom = unique_chrom[0]
1768                    chromosomes = np.array([single_chrom] * physical_pos.shape[0])
1769                else:
1770                    raise ValueError("Multiple chromosomes detected, but `chromosomes` was not provided.")
1771
1772        # Match existing windows to a reference laiobj
1773        elif laiobj is not None:
1774            physical_pos = laiobj.physical_pos
1775            chromosomes = laiobj.chromosomes
1776            window_sizes = laiobj.window_sizes
1777
1778        # Allocate an output LAI array
1779        n_windows = physical_pos.shape[0]
1780        n_samples = self.n_samples
1781        if self.calldata_lai.ndim == 3:
1782            lai = np.zeros((n_windows, n_samples, 2))
1783        else:
1784            lai = np.zeros((n_windows, n_samples*2))
1785
1786        # For each window, find the relevant SNPs and compute the mode of the ancestries
1787        for i, ((start, end), chrom) in enumerate(zip(physical_pos, chromosomes)):
1788            snps_mask = (
1789                (self.variants_chrom == chrom) &
1790                (self.variants_pos >= start) &
1791                (self.variants_pos <= end)
1792            )
1793            if np.any(snps_mask):
1794                lai_mask = self.calldata_lai[snps_mask, ...]
1795                mode_ancestries = mode(lai_mask, axis=0, nan_policy='omit').mode
1796                lai[i] = mode_ancestries
1797            else:
1798                lai[i] = np.nan
1799
1800        # Generate haplotype labels, e.g. "Sample1.0", "Sample1.1"
1801        haplotypes = [f"{sample}.{i}" for sample in self.samples for i in range(2)]
1802
1803        # If original data was (n_snps, n_samples, 2), flatten to (n_windows, n_samples*2)
1804        if self.calldata_lai.ndim == 3:
1805            lai = lai.reshape(n_windows, -1)
1806
1807        # Aggregate into a LocalAncestryObject
1808        return LocalAncestryObject(
1809            haplotypes=haplotypes,
1810            lai=lai,
1811            samples=self.samples,
1812            ancestry_map=self.ancestry_map,
1813            window_sizes=window_sizes,
1814            physical_pos=physical_pos,
1815            chromosomes=chromosomes
1816        )

Aggregate the calldata_lai attribute into genomic windows within a snputils.ancestry.genobj.LocalAncestryObject.

Options for defining windows (in order of precedence):

  1. Fixed window size:
  • Use window_size to specify how many SNPs go into each window. The last window on each chromosome may be larger if SNPs are not evenly divisible by the size.
  1. Custom start and end positions:
  • Provide physical_pos (2D array of shape (n_windows, 2)) as the [start, end] base-pair coordinates for each window.
  • If chromosomes is not provided and self has exactly one chromosome, all windows are assumed to belong to that chromosome.
  • If multiple chromosomes exist but chromosomes is missing, an error will be raised.
  • Optionally, provide window_sizes to store the SNP count per-window.
  1. Matching existing windows:
  • Reuse window definitions (physical_pos, chromosomes, window_sizes) from an existing laiobj.
Arguments:
  • window_size (int, optional): Number of SNPs in each window if defining fixed-size windows. If the total number of SNPs in a chromosome is not evenly divisible by the window size, the last window on that chromosome will include all remaining SNPs and therefore be larger than the specified size.
  • physical_pos (array of shape (n_windows, 2), optional): A 2D array containing the start and end physical positions for each window.
  • chromosomes (array of shape (n_windows,), optional): An array with chromosome numbers corresponding to each genomic window.
  • window_sizes (array of shape (n_windows,), optional): An array specifying the number of SNPs in each genomic window.
  • laiobj (LocalAncestryObject, optional): A reference LocalAncestryObject from which to copy existing window definitions.
Returns:

LocalAncestryObject: A LocalAncestryObject containing window-level ancestry data.

def save(self, file: str | pathlib.Path) -> None:
1818    def save(self, file: Union[str, Path]) -> None:
1819        """
1820        Save the data stored in `self` to a specified file.
1821
1822        The format of the saved file is determined by the file extension provided in the `file` 
1823        argument. 
1824        
1825        **Supported formats:**
1826        
1827        - `.bed`: Binary PED (Plink) format.
1828        - `.pgen`: Plink2 binary genotype format.
1829        - `.vcf`: Variant Call Format.
1830        - `.pkl`: Pickle format for saving `self` in serialized form.
1831
1832        Args:
1833            file (str or pathlib.Path): 
1834                Path to the file where the data will be saved. The extension of the file determines the save format. 
1835                Supported extensions: `.bed`, `.pgen`, `.vcf`, `.pkl`.
1836        """
1837        ext = Path(file).suffix.lower()
1838        if ext == '.bed':
1839            self.save_bed(file)
1840        elif ext == '.pgen':
1841            self.save_pgen(file)
1842        elif ext == '.vcf':
1843            self.save_vcf(file)
1844        elif ext == '.pkl':
1845            self.save_pickle(file)
1846        else:
1847            raise ValueError(f"Unsupported file extension: {ext}")

Save the data stored in self to a specified file.

The format of the saved file is determined by the file extension provided in the file argument.

Supported formats:

  • .bed: Binary PED (Plink) format.
  • .pgen: Plink2 binary genotype format.
  • .vcf: Variant Call Format.
  • .pkl: Pickle format for saving self in serialized form.
Arguments:
  • file (str or pathlib.Path): Path to the file where the data will be saved. The extension of the file determines the save format. Supported extensions: .bed, .pgen, .vcf, .pkl.
def save_bed(self, file: str | pathlib.Path) -> None:
1849    def save_bed(self, file: Union[str, Path]) -> None:
1850        """
1851        Save the data stored in `self` to a `.bed` file.
1852
1853        Args:
1854            file (str or pathlib.Path): 
1855                Path to the file where the data will be saved. It should end with `.bed`. 
1856                If the provided path does not have this extension, it will be appended.
1857        """
1858        from snputils.snp.io.write.bed import BEDWriter
1859        writer = BEDWriter(snpobj=self, filename=file)
1860        writer.write()

Save the data stored in self to a .bed file.

Arguments:
  • file (str or pathlib.Path): Path to the file where the data will be saved. It should end with .bed. If the provided path does not have this extension, it will be appended.
def save_pgen(self, file: str | pathlib.Path) -> None:
1862    def save_pgen(self, file: Union[str, Path]) -> None:
1863        """
1864        Save the data stored in `self` to a `.pgen` file.
1865
1866        Args:
1867            file (str or pathlib.Path): 
1868                Path to the file where the data will be saved. It should end with `.pgen`. 
1869                If the provided path does not have this extension, it will be appended.
1870        """
1871        from snputils.snp.io.write.pgen import PGENWriter
1872        writer = PGENWriter(snpobj=self, filename=file)
1873        writer.write()

Save the data stored in self to a .pgen file.

Arguments:
  • file (str or pathlib.Path): Path to the file where the data will be saved. It should end with .pgen. If the provided path does not have this extension, it will be appended.
def save_vcf(self, file: str | pathlib.Path) -> None:
1875    def save_vcf(self, file: Union[str, Path]) -> None:
1876        """
1877        Save the data stored in `self` to a `.vcf` file.
1878
1879        Args:
1880            file (str or pathlib.Path): 
1881                Path to the file where the data will be saved. It should end with `.vcf`. 
1882                If the provided path does not have this extension, it will be appended.
1883        """
1884        from snputils.snp.io.write.vcf import VCFWriter
1885        writer = VCFWriter(snpobj=self, filename=file)
1886        writer.write()

Save the data stored in self to a .vcf file.

Arguments:
  • file (str or pathlib.Path): Path to the file where the data will be saved. It should end with .vcf. If the provided path does not have this extension, it will be appended.
def save_pickle(self, file: str | pathlib.Path) -> None:
1888    def save_pickle(self, file: Union[str, Path]) -> None:
1889        """
1890        Save `self` in serialized form to a `.pkl` file.
1891
1892        Args:
1893            file (str or pathlib.Path): 
1894                Path to the file where the data will be saved. It should end with `.pkl`. 
1895                If the provided path does not have this extension, it will be appended.
1896        """
1897        import pickle
1898        with open(file, 'wb') as file:
1899            pickle.dump(self, file)

Save self in serialized form to a .pkl file.

Arguments:
  • file (str or pathlib.Path): Path to the file where the data will be saved. It should end with .pkl. If the provided path does not have this extension, it will be appended.