snputils.ancestry.genobj

1from .wide import GlobalAncestryObject
2from .local import LocalAncestryObject
3
4__all__ = ['GlobalAncestryObject', 'LocalAncestryObject']
class GlobalAncestryObject(snputils.ancestry.genobj.base.AncestryObject):
 10class GlobalAncestryObject(AncestryObject):
 11    """
 12    A class for Global Ancestry Inference (GAI) data.
 13    """
 14    def __init__(
 15        self,
 16        Q: np.ndarray,
 17        P: np.ndarray,
 18        samples: Optional[Sequence] = None,
 19        snps: Optional[Sequence] = None,
 20        ancestries: Optional[Sequence] = None
 21    ) -> None:
 22        """
 23        Args:
 24            Q (array of shape (n_samples, n_ancestries)):
 25                A 2D array containing per-sample ancestry proportions. Each row corresponds to a sample,
 26                and each column corresponds to an ancestry.
 27            P (array of shape (n_snps, n_ancestries)):
 28                A 2D array containing per-ancestry SNP frequencies. Each row corresponds to a SNP,
 29                and each column corresponds to an ancestry.
 30            samples (sequence of length n_samples, optional):
 31                A sequence containing unique identifiers for each sample. If None, sample identifiers 
 32                are assigned as integers from `0` to `n_samples - 1`.
 33            snps (sequence of length n_snps, optional):
 34                A sequence containing identifiers for each SNP. If None, SNPs are assigned as integers 
 35                from `0` to `n_snps - 1`.
 36            ancestries (sequence of length n_samples, optional):
 37                A sequence containing ancestry labels for each sample.
 38        """
 39        # Determine dimensions
 40        n_samples, n_ancestries_Q = Q.shape
 41        n_snps, n_ancestries_P = P.shape
 42
 43        if n_ancestries_Q != n_ancestries_P:
 44            raise ValueError(
 45                f"The number of ancestries in Q ({n_ancestries_Q}) and P ({n_ancestries_P}) must be the same."
 46            )
 47
 48        n_ancestries = n_ancestries_Q
 49
 50        # Assign default sample identifiers if none provided
 51        if samples is None:
 52            samples = list(range(n_samples))
 53        else:
 54            samples = list(samples)
 55            if len(samples) != n_samples:
 56                raise ValueError(
 57                    f"Length of samples ({len(samples)}) does not match number of samples ({n_samples})."
 58                )
 59
 60        # Assign default SNP identifiers if none provided
 61        if snps is None:
 62            snps = list(range(n_snps))
 63        else:
 64            snps = list(snps)
 65            if len(snps) != n_snps:
 66                raise ValueError(
 67                    f"Length of snps ({len(snps)}) does not match number of SNPs ({n_snps})."
 68                )
 69
 70        if len(ancestries) != n_samples:
 71            raise ValueError(
 72                f"Length of ancestries ({len(ancestries)}) does not match number of samples ({n_samples})."
 73            )
 74
 75        super().__init__(n_samples, n_ancestries)
 76
 77        # Store attributes
 78        self.__Q = Q
 79        self.__P = P
 80        self.__samples = np.asarray(samples)
 81        self.__snps = np.asarray(snps)
 82        self.__ancestries = np.asarray(ancestries)
 83
 84        # Perform sanity checks
 85        self._sanity_check()
 86
 87    def __getitem__(self, key):
 88        """
 89        To access an attribute of the class using the square bracket notation,
 90        similar to a dictionary.
 91        """
 92        try:
 93            return getattr(self, key)
 94        except AttributeError:
 95            raise KeyError(f'Invalid key: {key}')
 96
 97    def __setitem__(self, key, value):
 98        """
 99        To set an attribute of the class using the square bracket notation,
100        similar to a dictionary.
101        """
102        try:
103            setattr(self, key, value)
104        except AttributeError:
105            raise KeyError(f'Invalid key: {key}')
106
107    @property
108    def Q(self) -> np.ndarray:
109        """
110        Retrieve `Q`.
111
112        Returns:
113            **array of shape (n_samples, n_ancestries):** 
114                A 2D array containing per-sample ancestry proportions. Each row corresponds to a sample,
115                and each column corresponds to an ancestry.
116        """
117        return self.__Q
118    
119    @Q.setter
120    def Q(self, x: np.ndarray):
121        """
122        Update `Q`.
123        """
124        if x.shape != (self.n_samples, self.n_ancestries):
125            raise ValueError(
126                f"Q must have shape ({self.n_samples}, {self.n_ancestries}); got {x.shape}."
127            )
128        self.__Q = x
129    
130    @property
131    def P(self) -> np.ndarray:
132        """
133        Retrieve `P`.
134
135        Returns:
136            **array of shape (n_snps, n_ancestries):** 
137                A 2D array containing per-ancestry SNP frequencies. Each row corresponds to a SNP,
138                and each column corresponds to an ancestry.
139        """
140        return self.__P
141
142    @P.setter
143    def P(self, x: np.ndarray):
144        """
145        Update `P`.
146        """
147        if x.shape != (self.n_snps, self.n_ancestries):
148            raise ValueError(
149                f"P must have shape ({self.n_snps}, {self.n_ancestries}). Provided shape is {x.shape}."
150            )
151        self.__P = x
152        self._sanity_check()
153    
154    @property
155    def F(self) -> np.ndarray:
156        """
157        Alias for `P`.
158
159        Returns:
160            **array of shape (n_snps, n_ancestries):** 
161                A 2D array containing per-ancestry SNP frequencies. Each row corresponds to a SNP,
162                and each column corresponds to an ancestry.
163        """
164        return self.P
165
166    @F.setter
167    def F(self, x: np.ndarray):
168        """
169        Update `F`.
170        """
171        if x.shape != (self.n_snps, self.n_ancestries):
172            raise ValueError(
173                f"F must have shape ({self.n_snps}, {self.n_ancestries}). Provided shape is {x.shape}."
174            )
175        self.__P = x
176    
177    @property
178    def samples(self) -> Optional[np.ndarray]:
179        """
180        Retrieve `samples`.
181
182        Returns:
183            **array of shape (n_samples,):** 
184                An array containing unique identifiers for each sample. If None, sample 
185                identifiers are assigned as integers from `0` to `n_samples - 1`.
186        """
187        return self.__samples
188        
189    @samples.setter
190    def samples(self, x: Sequence):
191        """
192        Update `samples`.
193        """
194        x = list(x)
195        if len(x) != self.n_samples:
196            raise ValueError(
197                f"samples must have length {self.n_samples}; got length {len(x)}."
198            )
199        self.__samples = x
200
201    @property
202    def snps(self) -> Optional[np.ndarray]:
203        """
204        Retrieve `snps`.
205
206        Returns:
207            **array of shape (n_snps,):** 
208                An array containing identifiers for each SNP. If None, SNPs are assigned as integers 
209                from `0` to `n_snps - 1`.
210        """
211        return self.__snps
212
213    @snps.setter
214    def snps(self, x: Sequence):
215        """
216        Update `snps`.
217        """
218        x = list(x)
219        if len(x) != self.n_snps:
220            raise ValueError(
221                f"snps must have length {self.n_snps}; got length {len(x)}."
222            )
223        self.__snps = np.asarray(x)
224
225    @property
226    def ancestries(self) -> Optional[np.ndarray]:
227        """
228        Retrieve `ancestries`.
229
230        Returns:
231            **array of shape (n_samples,):** 
232                An array containing ancestry labels for each sample.
233        """
234        return self.__ancestries
235    
236    @ancestries.setter
237    def ancestries(self, x: Sequence):
238        """
239        Update `ancestries`.
240        """
241        x = list(x)
242        num_x = len(x)
243        num_unique_x = len(np.unique(x))
244
245        if num_x != self.n_samples:
246            raise ValueError(
247                f"ancestries must have length {self.n_samples}; got length {num_x}."
248            )
249        if num_unique_x <= self.n_ancestries:
250            raise ValueError(
251                f"Number of unique ancestry labels must be less than or equal to {self.n_ancestries}; got {num_unique_x} unique labels."
252            )
253        self.__ancestries = np.asarray(x)
254    
255    @property
256    def n_samples(self) -> int:
257        """
258        Retrieve `n_samples`.
259
260        Returns:
261            **int:** The total number of samples.
262        """
263        return self.__Q.shape[0]
264
265    @property
266    def n_snps(self) -> int:
267        """
268        Retrieve `n_snps`.
269
270        Returns:
271            **int:** The total number of SNPs.
272        """
273        return self.__P.shape[0]
274
275    @property
276    def n_ancestries(self) -> int:
277        """
278        Retrieve `n_ancestries`.
279
280        Returns:
281            **int:** The total number of unique ancestries.
282        """
283        return self.__Q.shape[1]
284
285    def copy(self) -> 'GlobalAncestryObject':
286        """
287        Create and return a copy of `self`.
288
289        Returns:
290            **GlobalAncestryObject:** A new instance of the current object.
291        """
292        return copy.copy(self)
293
294    def keys(self) -> List[str]:
295        """
296        Retrieve a list of public attribute names for `self`.
297
298        Returns:
299            **list of str:** 
300                A list of attribute names, with internal name-mangling removed, 
301                for easier reference to public attributes in the instance.
302        """
303        return [attr.replace('_GlobalAncestryObject__', '').replace('_AncestryObject__', '') for attr in vars(self)]
304
305    def _sanity_check(self) -> None:
306        """
307        Perform sanity checks to ensure that matrix dimensions are consistent with expected sizes.
308        
309        Raises:
310            **ValueError:** If any of the matrix dimensions do not match the expected sizes.
311        """       
312        # Check that the Q matrix has the correct shape
313        if self.__Q.shape != (self.n_samples, self.n_ancestries):
314            raise ValueError(
315                f"Q must have shape ({self.n_samples}, {self.n_ancestries}); got {self.__Q.shape}."
316            )
317
318        # Check that the P matrix has the correct shape
319        if self.__P.shape != (self.n_snps, self.n_ancestries):
320            raise ValueError(
321                f"P must have shape ({self.n_snps}, {self.n_ancestries}); got {self.__P.shape}."
322            )
323
324        # Check that samples length matches n_samples
325        if self.samples is not None:
326            if len(self.__samples) != self.n_samples:
327                raise ValueError(
328                    f"samples must have length {self.n_samples}; got length {len(self.__samples)}."
329                )
330
331        # Check that snps length matches n_snps
332        if self.snps is not None:
333            if len(self.__snps) != self.n_snps:
334                raise ValueError(
335                    f"snps must have length {self.n_snps}; got length {len(self.__snps)}."
336                )
337
338        # Check that ancestries length matches n_samples
339        if self.ancestries is not None:
340            if len(self.__ancestries) != self.n_samples:
341                raise ValueError(
342                    f"ancestries must have length {self.n_samples}; got length {len(self.__ancestries)}."
343                )
344
345            # Check number of unique ancestry labels
346            num_unique_ancestries = len(np.unique(self.__ancestries))
347            if num_unique_ancestries <= self.n_ancestries:
348                raise ValueError(
349                    f"Number of unique ancestry labels must be less than or equal to {self.n_ancestries}; got {num_unique_ancestries} unique labels."
350                )
351
352    def save(self, file: Union[str, Path]) -> None:
353        """
354        Save the data stored in `self` to a specified file or set of files.
355
356        The format of the saved file(s) is determined by the file extension provided in the `file` 
357        argument. If the extension is `.pkl`, the object is serialized as a pickle file. Otherwise, 
358        the file is treated as a prefix for saving ADMIXTURE files.
359
360        **Supported formats:**
361
362        - `.pkl`: Pickle format for saving `self` in serialized form.
363        - Any other extension or no extension: Treated as a prefix for ADMIXTURE files.
364
365        Args:
366            file (str or pathlib.Path): 
367                Path to the file where the data will be saved. If the extension is `.pkl`, the object
368                is serialized. Otherwise, it is treated as a prefix for ADMIXTURE files.
369        """
370        path = Path(file)
371        suffix = path.suffix.lower()
372
373        if suffix == '.pkl':
374            self.save_pickle(path)
375        else:
376            self.save_admixture(path)
377
378    def save_admixture(self, file_prefix: Union[str, Path]) -> None:
379        """
380        Save the data stored in `self` into multiple ADMIXTURE files.
381        If the file already exists, it will be overwritten.
382
383        **Output files:**
384
385        - `<file_prefix>.K.Q`: Q matrix file. The file uses space (' ') as the delimiter.
386        - `<file_prefix>.K.P`: P matrix file. The file uses space (' ') as the delimiter.
387        - `<file_prefix>.sample_ids.txt`: Sample IDs file (if sample IDs are available).
388        - `<file_prefix>.snp_ids.txt`: SNP IDs file (if SNP IDs are available).
389        - `<file_prefix>.map`: Ancestry file (if ancestries information is available).
390        
391        Args:
392            file_prefix (str or pathlib.Path): 
393                The base prefix for output file names, including directory path but excluding file extensions. 
394                The prefix is used to generate specific file names for each output, with file-specific 
395                suffixes appended as described above (e.g., `file_prefix.n_ancestries.Q` for the Q matrix file).
396        """
397        from snputils.ancestry.io.wide.write.admixture import AdmixtureWriter
398
399        AdmixtureWriter(self, file_prefix).write()
400
401    def save_pickle(self, file: Union[str, Path]) -> None:
402        """
403        Save `self` in serialized form to a `.pkl` file.
404        If the file already exists, it will be overwritten.
405
406        Args:
407            file (str or pathlib.Path): 
408                Path to the file where the data will be saved. It should end with `.pkl`. 
409                If the provided path does not have this extension, it will be appended.
410        """
411        import pickle
412        with open(file, 'wb') as file:
413            pickle.dump(self, file)

A class for Global Ancestry Inference (GAI) data.

GlobalAncestryObject( Q: numpy.ndarray, P: numpy.ndarray, samples: Optional[Sequence] = None, snps: Optional[Sequence] = None, ancestries: Optional[Sequence] = None)
14    def __init__(
15        self,
16        Q: np.ndarray,
17        P: np.ndarray,
18        samples: Optional[Sequence] = None,
19        snps: Optional[Sequence] = None,
20        ancestries: Optional[Sequence] = None
21    ) -> None:
22        """
23        Args:
24            Q (array of shape (n_samples, n_ancestries)):
25                A 2D array containing per-sample ancestry proportions. Each row corresponds to a sample,
26                and each column corresponds to an ancestry.
27            P (array of shape (n_snps, n_ancestries)):
28                A 2D array containing per-ancestry SNP frequencies. Each row corresponds to a SNP,
29                and each column corresponds to an ancestry.
30            samples (sequence of length n_samples, optional):
31                A sequence containing unique identifiers for each sample. If None, sample identifiers 
32                are assigned as integers from `0` to `n_samples - 1`.
33            snps (sequence of length n_snps, optional):
34                A sequence containing identifiers for each SNP. If None, SNPs are assigned as integers 
35                from `0` to `n_snps - 1`.
36            ancestries (sequence of length n_samples, optional):
37                A sequence containing ancestry labels for each sample.
38        """
39        # Determine dimensions
40        n_samples, n_ancestries_Q = Q.shape
41        n_snps, n_ancestries_P = P.shape
42
43        if n_ancestries_Q != n_ancestries_P:
44            raise ValueError(
45                f"The number of ancestries in Q ({n_ancestries_Q}) and P ({n_ancestries_P}) must be the same."
46            )
47
48        n_ancestries = n_ancestries_Q
49
50        # Assign default sample identifiers if none provided
51        if samples is None:
52            samples = list(range(n_samples))
53        else:
54            samples = list(samples)
55            if len(samples) != n_samples:
56                raise ValueError(
57                    f"Length of samples ({len(samples)}) does not match number of samples ({n_samples})."
58                )
59
60        # Assign default SNP identifiers if none provided
61        if snps is None:
62            snps = list(range(n_snps))
63        else:
64            snps = list(snps)
65            if len(snps) != n_snps:
66                raise ValueError(
67                    f"Length of snps ({len(snps)}) does not match number of SNPs ({n_snps})."
68                )
69
70        if len(ancestries) != n_samples:
71            raise ValueError(
72                f"Length of ancestries ({len(ancestries)}) does not match number of samples ({n_samples})."
73            )
74
75        super().__init__(n_samples, n_ancestries)
76
77        # Store attributes
78        self.__Q = Q
79        self.__P = P
80        self.__samples = np.asarray(samples)
81        self.__snps = np.asarray(snps)
82        self.__ancestries = np.asarray(ancestries)
83
84        # Perform sanity checks
85        self._sanity_check()
Arguments:
  • Q (array of shape (n_samples, n_ancestries)): A 2D array containing per-sample ancestry proportions. Each row corresponds to a sample, and each column corresponds to an ancestry.
  • P (array of shape (n_snps, n_ancestries)): A 2D array containing per-ancestry SNP frequencies. Each row corresponds to a SNP, and each column corresponds to an ancestry.
  • samples (sequence of length n_samples, optional): A sequence containing unique identifiers for each sample. If None, sample identifiers are assigned as integers from 0 to n_samples - 1.
  • snps (sequence of length n_snps, optional): A sequence containing identifiers for each SNP. If None, SNPs are assigned as integers from 0 to n_snps - 1.
  • ancestries (sequence of length n_samples, optional): A sequence containing ancestry labels for each sample.
Q: numpy.ndarray
107    @property
108    def Q(self) -> np.ndarray:
109        """
110        Retrieve `Q`.
111
112        Returns:
113            **array of shape (n_samples, n_ancestries):** 
114                A 2D array containing per-sample ancestry proportions. Each row corresponds to a sample,
115                and each column corresponds to an ancestry.
116        """
117        return self.__Q

Retrieve Q.

Returns:

array of shape (n_samples, n_ancestries): A 2D array containing per-sample ancestry proportions. Each row corresponds to a sample, and each column corresponds to an ancestry.

P: numpy.ndarray
130    @property
131    def P(self) -> np.ndarray:
132        """
133        Retrieve `P`.
134
135        Returns:
136            **array of shape (n_snps, n_ancestries):** 
137                A 2D array containing per-ancestry SNP frequencies. Each row corresponds to a SNP,
138                and each column corresponds to an ancestry.
139        """
140        return self.__P

Retrieve P.

Returns:

array of shape (n_snps, n_ancestries): A 2D array containing per-ancestry SNP frequencies. Each row corresponds to a SNP, and each column corresponds to an ancestry.

F: numpy.ndarray
154    @property
155    def F(self) -> np.ndarray:
156        """
157        Alias for `P`.
158
159        Returns:
160            **array of shape (n_snps, n_ancestries):** 
161                A 2D array containing per-ancestry SNP frequencies. Each row corresponds to a SNP,
162                and each column corresponds to an ancestry.
163        """
164        return self.P

Alias for P.

Returns:

array of shape (n_snps, n_ancestries): A 2D array containing per-ancestry SNP frequencies. Each row corresponds to a SNP, and each column corresponds to an ancestry.

samples: Optional[numpy.ndarray]
177    @property
178    def samples(self) -> Optional[np.ndarray]:
179        """
180        Retrieve `samples`.
181
182        Returns:
183            **array of shape (n_samples,):** 
184                An array containing unique identifiers for each sample. If None, sample 
185                identifiers are assigned as integers from `0` to `n_samples - 1`.
186        """
187        return self.__samples

Retrieve samples.

Returns:

array of shape (n_samples,): An array containing unique identifiers for each sample. If None, sample identifiers are assigned as integers from 0 to n_samples - 1.

snps: Optional[numpy.ndarray]
201    @property
202    def snps(self) -> Optional[np.ndarray]:
203        """
204        Retrieve `snps`.
205
206        Returns:
207            **array of shape (n_snps,):** 
208                An array containing identifiers for each SNP. If None, SNPs are assigned as integers 
209                from `0` to `n_snps - 1`.
210        """
211        return self.__snps

Retrieve snps.

Returns:

array of shape (n_snps,): An array containing identifiers for each SNP. If None, SNPs are assigned as integers from 0 to n_snps - 1.

ancestries: Optional[numpy.ndarray]
225    @property
226    def ancestries(self) -> Optional[np.ndarray]:
227        """
228        Retrieve `ancestries`.
229
230        Returns:
231            **array of shape (n_samples,):** 
232                An array containing ancestry labels for each sample.
233        """
234        return self.__ancestries

Retrieve ancestries.

Returns:

array of shape (n_samples,): An array containing ancestry labels for each sample.

n_samples: int
255    @property
256    def n_samples(self) -> int:
257        """
258        Retrieve `n_samples`.
259
260        Returns:
261            **int:** The total number of samples.
262        """
263        return self.__Q.shape[0]

Retrieve n_samples.

Returns:

int: The total number of samples.

n_snps: int
265    @property
266    def n_snps(self) -> int:
267        """
268        Retrieve `n_snps`.
269
270        Returns:
271            **int:** The total number of SNPs.
272        """
273        return self.__P.shape[0]

Retrieve n_snps.

Returns:

int: The total number of SNPs.

n_ancestries: int
275    @property
276    def n_ancestries(self) -> int:
277        """
278        Retrieve `n_ancestries`.
279
280        Returns:
281            **int:** The total number of unique ancestries.
282        """
283        return self.__Q.shape[1]

Retrieve n_ancestries.

Returns:

int: The total number of unique ancestries.

def copy(self) -> GlobalAncestryObject:
285    def copy(self) -> 'GlobalAncestryObject':
286        """
287        Create and return a copy of `self`.
288
289        Returns:
290            **GlobalAncestryObject:** A new instance of the current object.
291        """
292        return copy.copy(self)

Create and return a copy of self.

Returns:

GlobalAncestryObject: A new instance of the current object.

def keys(self) -> List[str]:
294    def keys(self) -> List[str]:
295        """
296        Retrieve a list of public attribute names for `self`.
297
298        Returns:
299            **list of str:** 
300                A list of attribute names, with internal name-mangling removed, 
301                for easier reference to public attributes in the instance.
302        """
303        return [attr.replace('_GlobalAncestryObject__', '').replace('_AncestryObject__', '') for attr in vars(self)]

Retrieve a list of public attribute names for self.

Returns:

list of str: A list of attribute names, with internal name-mangling removed, for easier reference to public attributes in the instance.

def save(self, file: Union[str, pathlib._local.Path]) -> None:
352    def save(self, file: Union[str, Path]) -> None:
353        """
354        Save the data stored in `self` to a specified file or set of files.
355
356        The format of the saved file(s) is determined by the file extension provided in the `file` 
357        argument. If the extension is `.pkl`, the object is serialized as a pickle file. Otherwise, 
358        the file is treated as a prefix for saving ADMIXTURE files.
359
360        **Supported formats:**
361
362        - `.pkl`: Pickle format for saving `self` in serialized form.
363        - Any other extension or no extension: Treated as a prefix for ADMIXTURE files.
364
365        Args:
366            file (str or pathlib.Path): 
367                Path to the file where the data will be saved. If the extension is `.pkl`, the object
368                is serialized. Otherwise, it is treated as a prefix for ADMIXTURE files.
369        """
370        path = Path(file)
371        suffix = path.suffix.lower()
372
373        if suffix == '.pkl':
374            self.save_pickle(path)
375        else:
376            self.save_admixture(path)

Save the data stored in self to a specified file or set of files.

The format of the saved file(s) is determined by the file extension provided in the file argument. If the extension is .pkl, the object is serialized as a pickle file. Otherwise, the file is treated as a prefix for saving ADMIXTURE files.

Supported formats:

  • .pkl: Pickle format for saving self in serialized form.
  • Any other extension or no extension: Treated as a prefix for ADMIXTURE files.
Arguments:
  • file (str or pathlib.Path): Path to the file where the data will be saved. If the extension is .pkl, the object is serialized. Otherwise, it is treated as a prefix for ADMIXTURE files.
def save_admixture(self, file_prefix: Union[str, pathlib._local.Path]) -> None:
378    def save_admixture(self, file_prefix: Union[str, Path]) -> None:
379        """
380        Save the data stored in `self` into multiple ADMIXTURE files.
381        If the file already exists, it will be overwritten.
382
383        **Output files:**
384
385        - `<file_prefix>.K.Q`: Q matrix file. The file uses space (' ') as the delimiter.
386        - `<file_prefix>.K.P`: P matrix file. The file uses space (' ') as the delimiter.
387        - `<file_prefix>.sample_ids.txt`: Sample IDs file (if sample IDs are available).
388        - `<file_prefix>.snp_ids.txt`: SNP IDs file (if SNP IDs are available).
389        - `<file_prefix>.map`: Ancestry file (if ancestries information is available).
390        
391        Args:
392            file_prefix (str or pathlib.Path): 
393                The base prefix for output file names, including directory path but excluding file extensions. 
394                The prefix is used to generate specific file names for each output, with file-specific 
395                suffixes appended as described above (e.g., `file_prefix.n_ancestries.Q` for the Q matrix file).
396        """
397        from snputils.ancestry.io.wide.write.admixture import AdmixtureWriter
398
399        AdmixtureWriter(self, file_prefix).write()

Save the data stored in self into multiple ADMIXTURE files. If the file already exists, it will be overwritten.

Output files:

  • <file_prefix>.K.Q: Q matrix file. The file uses space (' ') as the delimiter.
  • <file_prefix>.K.P: P matrix file. The file uses space (' ') as the delimiter.
  • <file_prefix>.sample_ids.txt: Sample IDs file (if sample IDs are available).
  • <file_prefix>.snp_ids.txt: SNP IDs file (if SNP IDs are available).
  • <file_prefix>.map: Ancestry file (if ancestries information is available).
Arguments:
  • file_prefix (str or pathlib.Path): The base prefix for output file names, including directory path but excluding file extensions. The prefix is used to generate specific file names for each output, with file-specific suffixes appended as described above (e.g., file_prefix.n_ancestries.Q for the Q matrix file).
def save_pickle(self, file: Union[str, pathlib._local.Path]) -> None:
401    def save_pickle(self, file: Union[str, Path]) -> None:
402        """
403        Save `self` in serialized form to a `.pkl` file.
404        If the file already exists, it will be overwritten.
405
406        Args:
407            file (str or pathlib.Path): 
408                Path to the file where the data will be saved. It should end with `.pkl`. 
409                If the provided path does not have this extension, it will be appended.
410        """
411        import pickle
412        with open(file, 'wb') as file:
413            pickle.dump(self, file)

Save self in serialized form to a .pkl file. If the file already exists, it will be overwritten.

Arguments:
  • file (str or pathlib.Path): Path to the file where the data will be saved. It should end with .pkl. If the provided path does not have this extension, it will be appended.
class LocalAncestryObject(snputils.ancestry.genobj.base.AncestryObject):
 17class LocalAncestryObject(AncestryObject):
 18    """
 19    A class for window-level Local Ancestry Inference (LAI) data.
 20    """
 21    def __init__(
 22        self,
 23        haplotypes: List[str], 
 24        lai: np.ndarray,
 25        samples: Optional[List[str]] = None, 
 26        ancestry_map: Optional[Dict[str, str]] = None, 
 27        window_sizes: Optional[np.ndarray] = None,
 28        centimorgan_pos: Optional[np.ndarray] = None,
 29        chromosomes: Optional[np.ndarray] = None,
 30        physical_pos: Optional[np.ndarray] = None
 31    ) -> None:
 32        """
 33        Args:
 34            haplotypes (list of str of length n_haplotypes):
 35                A list of unique haplotype identifiers.
 36            lai (array of shape (n_windows, n_haplotypes)): 
 37                A 2D array containing local ancestry inference values, where each row represents a 
 38                genomic window, and each column corresponds to a haplotype phase for each sample.
 39            samples (list of str of length n_samples, optional):
 40                A list of unique sample identifiers.
 41            ancestry_map (dict of str to str, optional):
 42                A dictionary mapping ancestry codes to region names.
 43            window_sizes (array of shape (n_windows,), optional): 
 44                An array specifying the number of SNPs in each genomic window.
 45            centimorgan_pos (array of shape (n_windows, 2), optional): 
 46                A 2D array containing the start and end centimorgan positions for each window.
 47            chromosomes (array of shape (n_windows,), optional): 
 48                An array with chromosome numbers corresponding to each genomic window.
 49            physical_pos (array of shape (n_windows, 2), optional): 
 50                A 2D array containing the start and end physical positions for each window.
 51        """
 52        if lai.ndim != 2:
 53            raise ValueError("`lai` must be a 2D array with shape (n_windows, n_haplotypes).")
 54        
 55        # Determine the number of unique ancestries and samples from the LAI array
 56        n_ancestries = len(np.unique(lai))
 57        n_haplotypes = lai.shape[1]
 58        n_samples = n_haplotypes // 2
 59
 60        super(LocalAncestryObject, self).__init__(n_samples, n_ancestries)
 61
 62        self.__haplotypes = haplotypes
 63        self.__lai = lai
 64        self.__window_sizes = window_sizes
 65        self.__centimorgan_pos = centimorgan_pos
 66        self.__samples = samples
 67        self.__chromosomes = chromosomes
 68        self.__physical_pos = physical_pos
 69        self.__ancestry_map = ancestry_map
 70
 71        # Perform sanity check to ensure all unique ancestries in LAI data are represented in the ancestry map
 72        self._sanity_check()
 73
 74    def __getitem__(self, key):
 75        """
 76        To access an attribute of the class using the square bracket notation,
 77        similar to a dictionary.
 78        """
 79        try:
 80            return getattr(self, key)
 81        except AttributeError:
 82            raise KeyError(f'Invalid key: {key}')
 83
 84    def __setitem__(self, key, value):
 85        """
 86        To set an attribute of the class using the square bracket notation,
 87        similar to a dictionary.
 88        """
 89        try:
 90            setattr(self, key, value)
 91        except AttributeError:
 92            raise KeyError(f'Invalid key: {key}')
 93
 94    @property
 95    def haplotypes(self) -> List[str]:
 96        """
 97        Retrieve `haplotypes`.
 98
 99        Returns:
100            **list of length n_haplotypes:** A list of unique haplotype identifiers.
101        """
102        return self.__haplotypes
103
104    @haplotypes.setter
105    def haplotypes(self, x):
106        """
107        Update `haplotypes`.
108        """
109        self.__haplotypes = x
110
111    @property
112    def lai(self) -> np.ndarray:
113        """
114        Retrieve `lai`.
115
116        Returns:
117            **array of shape (n_windows, n_haplotypes):** 
118                A 2D array containing local ancestry inference values, where each row represents a 
119                genomic window, and each column corresponds to a haplotype phase for each sample.
120        """
121        return self.__lai
122
123    @lai.setter
124    def lai(self, x):
125        """
126        Update `lai`.
127        """
128        self.__lai = x
129
130    @property
131    def samples(self) -> Optional[List[str]]:
132        """
133        Retrieve `samples`.
134
135        Returns:
136            **list of str:** A list of unique sample identifiers.
137        """
138        if self.__samples is not None:
139            return self.__samples
140        elif self.__haplotypes is not None:
141            return [hap.split('.')[0] for hap in self.__haplotypes][::2]
142        else:
143            return None
144    
145    @samples.setter
146    def samples(self, x):
147        """
148        Update `samples`.
149        """
150        self.__samples = x
151
152    @property
153    def ancestry_map(self) -> Optional[Dict[str, str]]:
154        """
155        Retrieve `ancestry_map`.
156
157        Returns:
158            **dict of str to str:** A dictionary mapping ancestry codes to region names.
159        """
160        return self.__ancestry_map
161
162    @ancestry_map.setter
163    def ancestry_map(self, x):
164        """
165        Update `ancestry_map`.
166        """
167        self.__ancestry_map = x
168
169    @property
170    def window_sizes(self) -> Optional[np.ndarray]:
171        """
172        Retrieve `window_sizes`.
173
174        Returns:
175            **array of shape (n_windows,):** 
176                An array specifying the number of SNPs in each genomic window.
177        """
178        return self.__window_sizes
179        
180    @window_sizes.setter
181    def window_sizes(self, x):
182        """
183        Update `window_sizes`.
184        """
185        self.__window_sizes = x
186
187    @property
188    def centimorgan_pos(self) -> Optional[np.ndarray]:
189        """
190        Retrieve `centimorgan_pos`.
191
192        Returns:
193            **array of shape (n_windows, 2):** 
194                A 2D array containing the start and end centimorgan positions for each window.
195        """
196        return self.__centimorgan_pos
197
198    @centimorgan_pos.setter
199    def centimorgan_pos(self, x):
200        """
201        Update `centimorgan_pos`.
202        """
203        self.__centimorgan_pos = x
204
205    @property
206    def chromosomes(self) -> Optional[np.ndarray]:
207        """
208        Retrieve `chromosomes`.
209
210        Returns:
211            **array of shape (n_windows,):** 
212                An array with chromosome numbers corresponding to each genomic window.
213        """
214        return self.__chromosomes
215        
216    @chromosomes.setter
217    def chromosomes(self, x):
218        """
219        Update `chromosomes`.
220        """
221        self.__chromosomes = x
222
223    @property
224    def physical_pos(self) -> Optional[np.ndarray]:
225        """
226        Retrieve `physical_pos`.
227
228        Returns:
229            **array of shape (n_windows, 2):** 
230                A 2D array containing the start and end physical positions for each window.
231        """
232        return self.__physical_pos
233
234    @physical_pos.setter
235    def physical_pos(self, x):
236        """
237        Update `physical_pos`.
238        """
239        self.__physical_pos = x
240
241    @property
242    def n_samples(self) -> int:
243        """
244        Retrieve `n_samples`.
245
246        Returns:
247            **int:** 
248                The total number of samples.
249        """
250        if self.__samples is not None:
251            return len(self.__samples)
252        elif self.__haplotypes is not None:
253            # Divide by 2 because each sample has two associated haplotypes
254            return len(self.__haplotypes) // 2
255        else:
256            # Divide by 2 because columns represent haplotypes
257            return self.__lai.shape[1] // 2
258
259    @property
260    def n_ancestries(self) -> int:
261        """
262        Retrieve `n_ancestries`.
263
264        Returns:
265            **int:** The total number of unique ancestries.
266        """
267        return len(np.unique(self.__lai))
268    
269    @property
270    def n_haplotypes(self) -> int:
271        """
272        Retrieve `n_haplotypes`.
273
274        Returns:
275            **int:** The total number of haplotypes.
276        """
277        if self.__haplotypes is not None:
278            return len(self.__haplotypes)
279        else:
280            return self.__lai.shape[1]
281
282    @property
283    def n_windows(self) -> int:
284        """
285        Retrieve `n_windows`.
286
287        Returns:
288            **int:** The total number of genomic windows.
289        """
290        return self.__lai.shape[0]
291
292    def copy(self) -> 'LocalAncestryObject':
293        """
294        Create and return a copy of `self`.
295
296        Returns:
297            **LocalAncestryObject:** 
298                A new instance of the current object.
299        """
300        return copy.copy(self)
301
302    def keys(self) -> List[str]:
303        """
304        Retrieve a list of public attribute names for `self`.
305
306        Returns:
307            **list of str:** 
308                A list of attribute names, with internal name-mangling removed, 
309                for easier reference to public attributes in the instance.
310        """
311        return [attr.replace('_LocalAncestryObject__', '').replace('_AncestryObject__', '') for attr in vars(self)]
312
313    def filter_windows(
314            self,
315            indexes: Union[int, Sequence[int], np.ndarray],
316            include: bool = True,
317            inplace: bool = False
318        ) -> Optional['LocalAncestryObject']:
319        """
320        Filter genomic windows based on specified indexes. 
321
322        This method updates the `lai` attribute to include or exclude the specified genomic windows. 
323        Attributes such as `window_sizes`, `centimorgan_pos`, `chromosomes`, and `physical_pos` will also be 
324        updated accordingly if they are not None. The order of genomic windows is preserved.
325
326        Negative indexes are supported and follow 
327        [NumPy's indexing conventions](https://numpy.org/doc/stable/user/basics.indexing.html). 
328
329        Args:
330            indexes (int or array-like of int): 
331                Index(es) of the windows to include or exclude. Can be a single integer or a
332                sequence of integers. Negative indexes are supported.
333            include (bool, default=True): 
334                If True, includes only the specified windows. If False, excludes the specified
335                windows. Default is True.
336            inplace (bool, default=False): 
337                If True, modifies `self` in place. If False, returns a new `LocalAncestryObject` with 
338                the windows filtered. Default is False.
339
340        Returns:
341            **Optional[LocalAncestryObject]:** 
342                A new `LocalAncestryObject` with the specified windows filtered if `inplace=False`. 
343                If `inplace=True`, modifies `self` in place and returns None.
344        """
345        # Convert indexes to a NumPy array
346        indexes = np.atleast_1d(indexes)
347
348        # Get total number of windows
349        n_windows = self.n_windows
350
351        # Validate indexes, allowing negative indexes
352        if np.any((indexes < -n_windows) | (indexes >= n_windows)):
353            raise IndexError("One or more indexes are out of bounds.")
354
355        # Create boolean mask
356        mask = np.zeros(n_windows, dtype=bool)
357        mask[indexes] = True
358
359        # Invert mask if `include=False`
360        if not include:
361            mask = ~mask
362        
363        # Filter `lai`
364        filtered_lai = self['lai'][mask, :] 
365        
366        # Filter `window_sizes`, `chromosomes`, `centimorgan_pos`, and `physical_pos`, checking if they are None before filtering
367        filtered_window_sizes = self['window_sizes'][mask] if self['window_sizes'] is not None else None
368        filtered_chromosomes = self['chromosomes'][mask] if self['chromosomes'] is not None else None
369        filtered_centimorgan_pos = self['centimorgan_pos'][mask, :] if self['centimorgan_pos'] is not None else None
370        filtered_physical_pos = self['physical_pos'][mask, :] if self['physical_pos'] is not None else None
371
372        # Modify the original object if `inplace=True`, otherwise create and return a copy
373        if inplace:
374            self['lai'] = filtered_lai
375            if filtered_window_sizes is not None:
376                self['window_sizes'] = filtered_window_sizes
377            if filtered_chromosomes is not None:
378                self['chromosomes'] = filtered_chromosomes
379            if filtered_centimorgan_pos is not None:
380                self['centimorgan_pos'] = filtered_centimorgan_pos
381            if filtered_physical_pos is not None:
382                self['physical_pos'] = filtered_physical_pos
383            return None
384        else:
385            laiobj = self.copy()
386            laiobj['lai'] = filtered_lai
387            if filtered_window_sizes is not None:
388                laiobj['window_sizes'] = filtered_window_sizes
389            if filtered_chromosomes is not None:
390                laiobj['chromosomes'] = filtered_chromosomes
391            if filtered_centimorgan_pos is not None:
392                laiobj['centimorgan_pos'] = filtered_centimorgan_pos
393            if filtered_physical_pos is not None:
394                laiobj['physical_pos'] = filtered_physical_pos
395            return laiobj
396
397    def filter_samples(
398        self,
399        samples: Optional[Union[str, Sequence[str], np.ndarray, None]] = None,
400        indexes: Optional[Union[int, Sequence[int], np.ndarray, None]] = None,
401        include: bool = True,
402        inplace: bool = False
403    ) -> Optional['LocalAncestryObject']:
404        """
405        Filter samples based on specified names or indexes.
406
407        This method updates the `lai`, `haplotypes`, and `samples` attributes to include or exclude the specified 
408        samples. Each sample is associated with two haplotypes, which are included or excluded together.
409        The order of the samples is preserved.
410
411        If both samples and indexes are provided, any sample matching either a name in samples or an index in 
412        indexes will be included or excluded.
413        
414        Negative indexes are supported and follow 
415        [NumPy's indexing conventions](https://numpy.org/doc/stable/user/basics.indexing.html).
416
417        Args:
418            samples (str or array_like of str, optional): 
419                 Name(s) of the samples to include or exclude. Can be a single sample name or a
420                 sequence of sample names. Default is None.
421            indexes (int or array_like of int, optional):
422                Index(es) of the samples to include or exclude. Can be a single index or a sequence
423                of indexes. Negative indexes are supported. Default is None.
424            include (bool, default=True): 
425                If True, includes only the specified samples. If False, excludes the specified
426                samples. Default is True.
427            inplace (bool, default=False): 
428                If True, modifies `self` in place. If False, returns a new `LocalAncestryObject` with the 
429                samples filtered. Default is False.
430
431        Returns:
432            **Optional[LocalAncestryObject]:** 
433                A new `LocalAncestryObject` with the specified samples filtered if `inplace=False`. 
434                If `inplace=True`, modifies `self` in place and returns None.
435        """
436        if samples is None and indexes is None:
437            raise UserWarning("At least one of 'samples' or 'indexes' must be provided.")
438
439        n_haplotypes = self.n_haplotypes
440        n_samples = self.n_samples
441
442        # Create mask based on sample names
443        if samples is not None:
444            samples = np.atleast_1d(samples)
445            # Extract sample names from haplotype identifiers
446            haplotype_ids = np.array(self['haplotypes'])
447            sample_names = np.array([hap.split('.')[0] for hap in haplotype_ids])
448            # Create mask for haplotypes belonging to specified samples
449            mask_samples = np.isin(sample_names, samples)
450        else:
451            mask_samples = np.zeros(n_haplotypes, dtype=bool)
452
453        # Create mask based on sample indexes
454        if indexes is not None:
455            indexes = np.atleast_1d(indexes)
456
457            # Validate indexes, allowing negative indexes
458            out_of_bounds_indexes = indexes[(indexes < -n_samples) | (indexes >= n_samples)]
459            if out_of_bounds_indexes.size > 0:
460                raise ValueError(f"One or more sample indexes are out of bounds.")
461
462            # Adjust negative indexes
463            indexes = np.mod(indexes, n_samples)
464            
465            # Get haplotype indexes for the specified sample indexes
466            haplotype_indexes = np.concatenate([2*indexes, 2*indexes+1])
467            # Create mask for haplotypes
468            mask_indexes = np.zeros(n_haplotypes, dtype=bool)
469            mask_indexes[haplotype_indexes] = True
470        else:
471            mask_indexes = np.zeros(n_haplotypes, dtype=bool)
472
473        # Combine masks using logical OR (union of samples)
474        mask_combined = mask_samples | mask_indexes
475
476        if not include:
477            mask_combined = ~mask_combined
478
479        # Filter `lai`
480        filtered_lai = self['lai'][:, mask_combined]
481
482        # Filter `haplotypes`
483        filtered_haplotypes = np.array(self['haplotypes'])[mask_combined].tolist()
484
485        # Filter `samples`, checking if they are None before filtering
486        sample_mask = mask_combined.reshape(-1, 2).any(axis=1)
487        filtered_samples = np.array(self['samples'])[sample_mask].tolist() if self['samples'] is not None else None
488
489        if inplace:
490            self['haplotypes'] = filtered_haplotypes
491            self['samples'] = filtered_samples
492            self['lai'] = filtered_lai
493            return None
494        else:
495            laiobj = self.copy()
496            laiobj['haplotypes'] = filtered_haplotypes
497            laiobj['samples'] = filtered_samples
498            laiobj['lai'] = filtered_lai
499            return laiobj
500
501    def convert_to_snp_level(
502        self,
503        snpobject: Optional['SNPObject'] = None,
504        variants_chrom: Optional[np.ndarray] = None,
505        variants_pos: Optional[np.ndarray] = None,
506        variants_ref: Optional[np.ndarray] = None,
507        variants_alt: Optional[np.ndarray] = None,
508        variants_filter_pass: Optional[np.ndarray] = None,
509        variants_id: Optional[np.ndarray] = None,
510        variants_qual: Optional[np.ndarray] = None,
511        lai_format: str = "3D"
512    ) -> 'SNPObject':
513        """
514        Convert `self` into a `snputils.snp.genobj.SNPObject` SNP-level Local Ancestry Information (LAI), 
515        with optional support for SNP data.
516        
517        If SNP positions (`variants_pos`) and/or chromosomes (`variants_chrom`) are not specified, the method generates 
518        SNPs uniformly across the start and end positions of each genomic window. Otherwise, the provided SNP 
519        coordinates are used to assign ancestry values based on their respective windows.
520
521        If a `SNPObject` is provided, its attributes are used unless explicitly overridden by the function arguments.
522        In that case, the SNPObject is updated with the (optional) new attributes and the computed `calldata_lai`, then returned.
523
524        Args:
525            snpobject (SNPObject, optional):
526                An existing `SNPObject` to extract SNP attributes from.
527            variants_chrom (array of shape (n_snps,), optional): 
528                An array containing the chromosome for each SNP.
529            variants_pos (array of shape (n_snps,), optional): 
530                An array containing the chromosomal positions for each SNP.
531            variants_ref (array of shape (n_snps,), optional): 
532                An array containing the reference allele for each SNP.
533            variants_alt (array of shape (n_snps,), optional): 
534                An array containing the alternate allele for each SNP.
535            variants_filter_pass (array of shape (n_snps,), optional): 
536                An array indicating whether each SNP passed control checks.
537            variants_id (array of shape (n_snps,), optional): 
538                An array containing unique identifiers (IDs) for each SNP.
539            variants_qual (array of shape (n_snps,), optional): 
540                An array containing the Phred-scaled quality score for each SNP.
541            lai_format (str, optional):
542                Determines the shape of `calldata_lai`:
543                    - `"3D"` (default): Shape `(n_snps, n_samples, 2)`.
544                    - `"2D"`: Shape `(n_snps, n_samples * 2)`.
545
546        Returns:
547            **SNPObject**: 
548                A `SNPObject` containing SNP-level ancestry data and updated SNP attributes.
549        """
550        from snputils.snp.genobj.snpobj import SNPObject
551
552        assert lai_format in {"2D", "3D"}, "Invalid `lai_format`. Must be '2D' or '3D'."
553
554        # Extract attributes from SNPObject if provided
555        if snpobject is not None:
556            variants_chrom = variants_chrom or snpobject.variants_chrom
557            variants_pos = variants_pos or snpobject.variants_pos
558            variants_ref = variants_ref or snpobject.variants_ref
559            variants_alt = variants_alt or snpobject.variants_alt
560            variants_filter_pass = variants_filter_pass or snpobject.variants_filter_pass
561            variants_id = variants_id or snpobject.variants_id
562            variants_qual = variants_qual or snpobject.variants_qual
563
564        n_samples = self.n_samples
565        lai_reshaped = self.lai.reshape(self.n_windows, n_samples, 2).astype(int) if lai_format == "3D" else None
566
567        if variants_pos is None or variants_chrom is None:
568            # Generate all SNP positions and corresponding chromosome labels between window boundaries
569            variants_pos_list = []
570            variants_chrom_list = []
571            ancestry_list = []
572
573            for i in range(self.n_windows):
574                start = int(self.physical_pos[i, 0])
575                end = int(self.physical_pos[i, 1])
576                chrom = self.chromosomes[i]
577
578                # Generate SNP positions at each base pair within the window range
579                positions_in_window = np.arange(start, end + 1)
580                if positions_in_window.size == 0:
581                    continue  # Skip windows that contain no valid SNP positions
582
583                n_positions = positions_in_window.size
584                variants_pos_list.append(positions_in_window)
585                variants_chrom_list.append(np.full(n_positions, chrom))
586
587                ancestry_repeated = (
588                    np.repeat(lai_reshaped[i, np.newaxis, :, :], n_positions, axis=0)
589                    if lai_format == "3D" else np.repeat(self.lai[i, np.newaxis, :], n_positions, axis=0)
590                )
591                ancestry_list.append(ancestry_repeated)
592
593            # Store SNP positions, their corresponding chromosome labels, and their associated ancestry
594            variants_pos = np.concatenate(variants_pos_list)
595            variants_chrom = np.concatenate(variants_chrom_list)
596            calldata_lai = np.concatenate(ancestry_list)
597        else:
598            # Use the provided SNP positions and chromosomes
599            n_snps = len(variants_pos)
600            if len(variants_chrom) != n_snps:
601                raise ValueError("`variants_pos` and `variants_chrom` must have the same length.")
602
603            # Initialize an array to store the corresponding window index for each SNP
604            # Default value is -1, meaning no matching window found
605            snp_to_window_indices = np.full(n_snps, -1, dtype=int)
606
607            # Identify unique chromosome names sorted by order of appearence
608            _, idx = np.unique(variants_chrom, return_index=True)
609            unique_chroms = variants_chrom[np.sort(idx)]
610
611            # Iterate through each unique chromosome to map SNPs to windows
612            for chrom in unique_chroms:
613                # Get indices of SNPs that belong to the current chromosome
614                snp_indices = np.where(variants_chrom == chrom)[0]
615                snp_pos_chr = variants_pos[snp_indices]
616                
617                # Get indices of windows that belong to the current chromosome
618                window_indices = np.where(self.chromosomes == chrom)[0]
619                if window_indices.size == 0:
620                    continue  # Skip if no windows exist for this chromosome
621                
622                # Extract start and end positions of the windows on this chromosome
623                window_starts_chr = self.physical_pos[:, 0][window_indices]
624                window_ends_chr = self.physical_pos[:, 1][window_indices]
625                
626                # Find the right-most window that a SNP would fit into (sorted order)
627                inds = np.searchsorted(window_starts_chr, snp_pos_chr, side='right') - 1
628                
629                # Mask valid SNPs: ensure they are within a valid range and fall inside window boundaries
630                valid_mask = (inds >= 0) & (inds < len(window_starts_chr)) & (snp_pos_chr <= window_ends_chr[inds])
631
632                # Assign valid SNPs to their corresponding window indices
633                snp_to_window_indices[snp_indices[valid_mask]] = window_indices[inds[valid_mask]]
634                log.debug(f"Number of SNPs within window ranges for chromosome {chrom}: {valid_mask.sum()}")
635
636            # Initialize SNP-level ancestry array with NaNs as default
637            shape = (n_snps, n_samples, 2) if lai_format == "3D" else (n_snps, n_samples * 2)
638            calldata_lai = np.full(shape, np.nan, dtype='uint8')
639
640            # Assign ancestry values to SNPs with valid window assignments
641            valid_mask = (snp_to_window_indices != -1)
642            snp_indices = np.where(valid_mask)[0]
643            snp_to_window_indices = snp_to_window_indices[snp_indices]
644
645            if lai_format == "3D":
646                calldata_lai[snp_indices] = lai_reshaped[snp_to_window_indices]
647            else:  # "2D"
648                calldata_lai[snp_indices] = self.lai[snp_to_window_indices]
649
650        if snpobject is not None:
651            # If a SNPObject was provided, update its attributes with any new values and add `calldata_lai``
652            snpobject.variants_chrom = variants_chrom
653            snpobject.variants_pos = variants_pos
654            snpobject.variants_ref = variants_ref
655            snpobject.variants_alt = variants_alt
656            snpobject.variants_filter_pass = variants_filter_pass
657            snpobject.variants_id = variants_id
658            snpobject.variants_qual = variants_qual
659            snpobject.calldata_lai = calldata_lai
660            snpobject.ancestry_map = self.ancestry_map
661            return snpobject
662        else:
663            # Otherwise, create a new SNPObject
664            return SNPObject(
665                calldata_lai=calldata_lai.view(),
666                samples=self.samples,
667                variants_ref=variants_ref.view() if isinstance(variants_ref, np.ndarray) else variants_ref,
668                variants_alt=variants_alt.view() if isinstance(variants_alt, np.ndarray) else variants_alt,
669                variants_filter_pass=variants_filter_pass.view() if isinstance(variants_filter_pass, np.ndarray) else variants_filter_pass,
670                variants_chrom=variants_chrom.view(),
671                variants_id=variants_id.view() if isinstance(variants_id, np.ndarray) else variants_id,
672                variants_pos=variants_pos.view(),
673                variants_qual=variants_qual.view() if isinstance(variants_qual, np.ndarray) else variants_qual,
674                ancestry_map=self.ancestry_map
675            )
676
677    def _sanity_check(self) -> None:
678        """
679        Perform sanity checks on the parsed data to ensure data integrity.
680
681        This method checks that all unique ancestries in LAI are represented 
682        in the ancestry map.
683
684        Args:
685            lai (np.ndarray): The LAI data array.
686            ancestry_map (dict, optional): A dictionary mapping ancestry codes to region names, if available.
687        """
688        # Get unique ancestries from LAI data
689        unique_ancestries = np.unique(self.lai)
690
691        if self.ancestry_map is not None:
692            # Check if all unique ancestries in the LAI are present in the ancestry map
693            for ancestry in unique_ancestries:
694                if str(ancestry) not in self.ancestry_map:
695                    warnings.warn(
696                        f"Ancestry '{ancestry}' found in LAI data is not represented in the ancestry map."
697                    )
698
699    def save(self, file: Union[str, Path]) -> None:
700        """
701        Save the data stored in `self` to a specified file.
702        If the file already exists, it will be overwritten.
703
704        The format of the saved file is determined by the file extension provided in the `file` 
705        argument.
706
707        **Supported formats:**
708
709        - `.msp`: Text-based MSP format.
710        - `.msp.tsv`: Text-based MSP format with TSV extension.
711        - `.pkl`: Pickle format for saving `self` in serialized form.
712
713        Args:
714            file (str or pathlib.Path): 
715                Path to the file where the data will be saved. The extension of the file determines the save format. 
716                Supported extensions: `.msp`, `.msp.tsv`, `.pkl`.
717        """
718        path = Path(file)
719        suffixes = [suffix.lower() for suffix in path.suffixes]
720
721        if suffixes[-2:] == ['.msp', '.tsv'] or suffixes[-1] == '.msp':
722            self.save_msp(file)
723        elif suffixes[-1] == '.pkl':
724            self.save_pickle(file)
725        else:
726            raise ValueError(
727                f"Unsupported file extension: {suffixes[-1]}"
728                "Supported extensions are: .msp, .msp.tsv, .pkl."
729            )
730
731    def save_msp(self, file: Union[str, Path]) -> None:
732        """
733        Save the data stored in `self` to a `.msp` file.
734        If the file already exists, it will be overwritten.
735
736        Args:
737            file (str or pathlib.Path): 
738                Path to the file where the data will be saved. It should end with `.msp` or `.msp.tsv`. 
739                If the provided path does not have one of these extensions, the `.msp` extension will be appended.
740        """
741        from snputils.ancestry.io.local.write import MSPWriter
742
743        MSPWriter(self, file).write()
744
745    def save_pickle(self, file: Union[str, Path]) -> None:
746        """
747        Save `self` in serialized form to a `.pkl` file.
748        If the file already exists, it will be overwritten.
749
750        Args:
751            file (str or pathlib.Path): 
752                Path to the file where the data will be saved. It should end with `.pkl`. 
753                If the provided path does not have this extension, it will be appended.
754        """
755        import pickle
756        with open(file, 'wb') as file:
757            pickle.dump(self, file)

A class for window-level Local Ancestry Inference (LAI) data.

LocalAncestryObject( haplotypes: List[str], lai: numpy.ndarray, samples: Optional[List[str]] = None, ancestry_map: Optional[Dict[str, str]] = None, window_sizes: Optional[numpy.ndarray] = None, centimorgan_pos: Optional[numpy.ndarray] = None, chromosomes: Optional[numpy.ndarray] = None, physical_pos: Optional[numpy.ndarray] = None)
21    def __init__(
22        self,
23        haplotypes: List[str], 
24        lai: np.ndarray,
25        samples: Optional[List[str]] = None, 
26        ancestry_map: Optional[Dict[str, str]] = None, 
27        window_sizes: Optional[np.ndarray] = None,
28        centimorgan_pos: Optional[np.ndarray] = None,
29        chromosomes: Optional[np.ndarray] = None,
30        physical_pos: Optional[np.ndarray] = None
31    ) -> None:
32        """
33        Args:
34            haplotypes (list of str of length n_haplotypes):
35                A list of unique haplotype identifiers.
36            lai (array of shape (n_windows, n_haplotypes)): 
37                A 2D array containing local ancestry inference values, where each row represents a 
38                genomic window, and each column corresponds to a haplotype phase for each sample.
39            samples (list of str of length n_samples, optional):
40                A list of unique sample identifiers.
41            ancestry_map (dict of str to str, optional):
42                A dictionary mapping ancestry codes to region names.
43            window_sizes (array of shape (n_windows,), optional): 
44                An array specifying the number of SNPs in each genomic window.
45            centimorgan_pos (array of shape (n_windows, 2), optional): 
46                A 2D array containing the start and end centimorgan positions for each window.
47            chromosomes (array of shape (n_windows,), optional): 
48                An array with chromosome numbers corresponding to each genomic window.
49            physical_pos (array of shape (n_windows, 2), optional): 
50                A 2D array containing the start and end physical positions for each window.
51        """
52        if lai.ndim != 2:
53            raise ValueError("`lai` must be a 2D array with shape (n_windows, n_haplotypes).")
54        
55        # Determine the number of unique ancestries and samples from the LAI array
56        n_ancestries = len(np.unique(lai))
57        n_haplotypes = lai.shape[1]
58        n_samples = n_haplotypes // 2
59
60        super(LocalAncestryObject, self).__init__(n_samples, n_ancestries)
61
62        self.__haplotypes = haplotypes
63        self.__lai = lai
64        self.__window_sizes = window_sizes
65        self.__centimorgan_pos = centimorgan_pos
66        self.__samples = samples
67        self.__chromosomes = chromosomes
68        self.__physical_pos = physical_pos
69        self.__ancestry_map = ancestry_map
70
71        # Perform sanity check to ensure all unique ancestries in LAI data are represented in the ancestry map
72        self._sanity_check()
Arguments:
  • haplotypes (list of str of length n_haplotypes): A list of unique haplotype identifiers.
  • lai (array of shape (n_windows, n_haplotypes)): A 2D array containing local ancestry inference values, where each row represents a genomic window, and each column corresponds to a haplotype phase for each sample.
  • samples (list of str of length n_samples, optional): A list of unique sample identifiers.
  • ancestry_map (dict of str to str, optional): A dictionary mapping ancestry codes to region names.
  • window_sizes (array of shape (n_windows,), optional): An array specifying the number of SNPs in each genomic window.
  • centimorgan_pos (array of shape (n_windows, 2), optional): A 2D array containing the start and end centimorgan positions for each window.
  • chromosomes (array of shape (n_windows,), optional): An array with chromosome numbers corresponding to each genomic window.
  • physical_pos (array of shape (n_windows, 2), optional): A 2D array containing the start and end physical positions for each window.
haplotypes: List[str]
 94    @property
 95    def haplotypes(self) -> List[str]:
 96        """
 97        Retrieve `haplotypes`.
 98
 99        Returns:
100            **list of length n_haplotypes:** A list of unique haplotype identifiers.
101        """
102        return self.__haplotypes

Retrieve haplotypes.

Returns:

list of length n_haplotypes: A list of unique haplotype identifiers.

lai: numpy.ndarray
111    @property
112    def lai(self) -> np.ndarray:
113        """
114        Retrieve `lai`.
115
116        Returns:
117            **array of shape (n_windows, n_haplotypes):** 
118                A 2D array containing local ancestry inference values, where each row represents a 
119                genomic window, and each column corresponds to a haplotype phase for each sample.
120        """
121        return self.__lai

Retrieve lai.

Returns:

array of shape (n_windows, n_haplotypes): A 2D array containing local ancestry inference values, where each row represents a genomic window, and each column corresponds to a haplotype phase for each sample.

samples: Optional[List[str]]
130    @property
131    def samples(self) -> Optional[List[str]]:
132        """
133        Retrieve `samples`.
134
135        Returns:
136            **list of str:** A list of unique sample identifiers.
137        """
138        if self.__samples is not None:
139            return self.__samples
140        elif self.__haplotypes is not None:
141            return [hap.split('.')[0] for hap in self.__haplotypes][::2]
142        else:
143            return None

Retrieve samples.

Returns:

list of str: A list of unique sample identifiers.

ancestry_map: Optional[Dict[str, str]]
152    @property
153    def ancestry_map(self) -> Optional[Dict[str, str]]:
154        """
155        Retrieve `ancestry_map`.
156
157        Returns:
158            **dict of str to str:** A dictionary mapping ancestry codes to region names.
159        """
160        return self.__ancestry_map

Retrieve ancestry_map.

Returns:

dict of str to str: A dictionary mapping ancestry codes to region names.

window_sizes: Optional[numpy.ndarray]
169    @property
170    def window_sizes(self) -> Optional[np.ndarray]:
171        """
172        Retrieve `window_sizes`.
173
174        Returns:
175            **array of shape (n_windows,):** 
176                An array specifying the number of SNPs in each genomic window.
177        """
178        return self.__window_sizes

Retrieve window_sizes.

Returns:

array of shape (n_windows,): An array specifying the number of SNPs in each genomic window.

centimorgan_pos: Optional[numpy.ndarray]
187    @property
188    def centimorgan_pos(self) -> Optional[np.ndarray]:
189        """
190        Retrieve `centimorgan_pos`.
191
192        Returns:
193            **array of shape (n_windows, 2):** 
194                A 2D array containing the start and end centimorgan positions for each window.
195        """
196        return self.__centimorgan_pos

Retrieve centimorgan_pos.

Returns:

array of shape (n_windows, 2): A 2D array containing the start and end centimorgan positions for each window.

chromosomes: Optional[numpy.ndarray]
205    @property
206    def chromosomes(self) -> Optional[np.ndarray]:
207        """
208        Retrieve `chromosomes`.
209
210        Returns:
211            **array of shape (n_windows,):** 
212                An array with chromosome numbers corresponding to each genomic window.
213        """
214        return self.__chromosomes

Retrieve chromosomes.

Returns:

array of shape (n_windows,): An array with chromosome numbers corresponding to each genomic window.

physical_pos: Optional[numpy.ndarray]
223    @property
224    def physical_pos(self) -> Optional[np.ndarray]:
225        """
226        Retrieve `physical_pos`.
227
228        Returns:
229            **array of shape (n_windows, 2):** 
230                A 2D array containing the start and end physical positions for each window.
231        """
232        return self.__physical_pos

Retrieve physical_pos.

Returns:

array of shape (n_windows, 2): A 2D array containing the start and end physical positions for each window.

n_samples: int
241    @property
242    def n_samples(self) -> int:
243        """
244        Retrieve `n_samples`.
245
246        Returns:
247            **int:** 
248                The total number of samples.
249        """
250        if self.__samples is not None:
251            return len(self.__samples)
252        elif self.__haplotypes is not None:
253            # Divide by 2 because each sample has two associated haplotypes
254            return len(self.__haplotypes) // 2
255        else:
256            # Divide by 2 because columns represent haplotypes
257            return self.__lai.shape[1] // 2

Retrieve n_samples.

Returns:

int: The total number of samples.

n_ancestries: int
259    @property
260    def n_ancestries(self) -> int:
261        """
262        Retrieve `n_ancestries`.
263
264        Returns:
265            **int:** The total number of unique ancestries.
266        """
267        return len(np.unique(self.__lai))

Retrieve n_ancestries.

Returns:

int: The total number of unique ancestries.

n_haplotypes: int
269    @property
270    def n_haplotypes(self) -> int:
271        """
272        Retrieve `n_haplotypes`.
273
274        Returns:
275            **int:** The total number of haplotypes.
276        """
277        if self.__haplotypes is not None:
278            return len(self.__haplotypes)
279        else:
280            return self.__lai.shape[1]

Retrieve n_haplotypes.

Returns:

int: The total number of haplotypes.

n_windows: int
282    @property
283    def n_windows(self) -> int:
284        """
285        Retrieve `n_windows`.
286
287        Returns:
288            **int:** The total number of genomic windows.
289        """
290        return self.__lai.shape[0]

Retrieve n_windows.

Returns:

int: The total number of genomic windows.

def copy(self) -> LocalAncestryObject:
292    def copy(self) -> 'LocalAncestryObject':
293        """
294        Create and return a copy of `self`.
295
296        Returns:
297            **LocalAncestryObject:** 
298                A new instance of the current object.
299        """
300        return copy.copy(self)

Create and return a copy of self.

Returns:

LocalAncestryObject: A new instance of the current object.

def keys(self) -> List[str]:
302    def keys(self) -> List[str]:
303        """
304        Retrieve a list of public attribute names for `self`.
305
306        Returns:
307            **list of str:** 
308                A list of attribute names, with internal name-mangling removed, 
309                for easier reference to public attributes in the instance.
310        """
311        return [attr.replace('_LocalAncestryObject__', '').replace('_AncestryObject__', '') for attr in vars(self)]

Retrieve a list of public attribute names for self.

Returns:

list of str: A list of attribute names, with internal name-mangling removed, for easier reference to public attributes in the instance.

def filter_windows( self, indexes: Union[int, Sequence[int], numpy.ndarray], include: bool = True, inplace: bool = False) -> Optional[LocalAncestryObject]:
313    def filter_windows(
314            self,
315            indexes: Union[int, Sequence[int], np.ndarray],
316            include: bool = True,
317            inplace: bool = False
318        ) -> Optional['LocalAncestryObject']:
319        """
320        Filter genomic windows based on specified indexes. 
321
322        This method updates the `lai` attribute to include or exclude the specified genomic windows. 
323        Attributes such as `window_sizes`, `centimorgan_pos`, `chromosomes`, and `physical_pos` will also be 
324        updated accordingly if they are not None. The order of genomic windows is preserved.
325
326        Negative indexes are supported and follow 
327        [NumPy's indexing conventions](https://numpy.org/doc/stable/user/basics.indexing.html). 
328
329        Args:
330            indexes (int or array-like of int): 
331                Index(es) of the windows to include or exclude. Can be a single integer or a
332                sequence of integers. Negative indexes are supported.
333            include (bool, default=True): 
334                If True, includes only the specified windows. If False, excludes the specified
335                windows. Default is True.
336            inplace (bool, default=False): 
337                If True, modifies `self` in place. If False, returns a new `LocalAncestryObject` with 
338                the windows filtered. Default is False.
339
340        Returns:
341            **Optional[LocalAncestryObject]:** 
342                A new `LocalAncestryObject` with the specified windows filtered if `inplace=False`. 
343                If `inplace=True`, modifies `self` in place and returns None.
344        """
345        # Convert indexes to a NumPy array
346        indexes = np.atleast_1d(indexes)
347
348        # Get total number of windows
349        n_windows = self.n_windows
350
351        # Validate indexes, allowing negative indexes
352        if np.any((indexes < -n_windows) | (indexes >= n_windows)):
353            raise IndexError("One or more indexes are out of bounds.")
354
355        # Create boolean mask
356        mask = np.zeros(n_windows, dtype=bool)
357        mask[indexes] = True
358
359        # Invert mask if `include=False`
360        if not include:
361            mask = ~mask
362        
363        # Filter `lai`
364        filtered_lai = self['lai'][mask, :] 
365        
366        # Filter `window_sizes`, `chromosomes`, `centimorgan_pos`, and `physical_pos`, checking if they are None before filtering
367        filtered_window_sizes = self['window_sizes'][mask] if self['window_sizes'] is not None else None
368        filtered_chromosomes = self['chromosomes'][mask] if self['chromosomes'] is not None else None
369        filtered_centimorgan_pos = self['centimorgan_pos'][mask, :] if self['centimorgan_pos'] is not None else None
370        filtered_physical_pos = self['physical_pos'][mask, :] if self['physical_pos'] is not None else None
371
372        # Modify the original object if `inplace=True`, otherwise create and return a copy
373        if inplace:
374            self['lai'] = filtered_lai
375            if filtered_window_sizes is not None:
376                self['window_sizes'] = filtered_window_sizes
377            if filtered_chromosomes is not None:
378                self['chromosomes'] = filtered_chromosomes
379            if filtered_centimorgan_pos is not None:
380                self['centimorgan_pos'] = filtered_centimorgan_pos
381            if filtered_physical_pos is not None:
382                self['physical_pos'] = filtered_physical_pos
383            return None
384        else:
385            laiobj = self.copy()
386            laiobj['lai'] = filtered_lai
387            if filtered_window_sizes is not None:
388                laiobj['window_sizes'] = filtered_window_sizes
389            if filtered_chromosomes is not None:
390                laiobj['chromosomes'] = filtered_chromosomes
391            if filtered_centimorgan_pos is not None:
392                laiobj['centimorgan_pos'] = filtered_centimorgan_pos
393            if filtered_physical_pos is not None:
394                laiobj['physical_pos'] = filtered_physical_pos
395            return laiobj

Filter genomic windows based on specified indexes.

This method updates the lai attribute to include or exclude the specified genomic windows. Attributes such as window_sizes, centimorgan_pos, chromosomes, and physical_pos will also be updated accordingly if they are not None. The order of genomic windows is preserved.

Negative indexes are supported and follow NumPy's indexing conventions.

Arguments:
  • indexes (int or array-like of int): Index(es) of the windows to include or exclude. Can be a single integer or a sequence of integers. Negative indexes are supported.
  • include (bool, default=True): If True, includes only the specified windows. If False, excludes the specified windows. Default is True.
  • inplace (bool, default=False): If True, modifies self in place. If False, returns a new LocalAncestryObject with the windows filtered. Default is False.
Returns:

Optional[LocalAncestryObject]: A new LocalAncestryObject with the specified windows filtered if inplace=False. If inplace=True, modifies self in place and returns None.

def filter_samples( self, samples: Union[str, Sequence[str], numpy.ndarray, NoneType] = None, indexes: Union[int, Sequence[int], numpy.ndarray, NoneType] = None, include: bool = True, inplace: bool = False) -> Optional[LocalAncestryObject]:
397    def filter_samples(
398        self,
399        samples: Optional[Union[str, Sequence[str], np.ndarray, None]] = None,
400        indexes: Optional[Union[int, Sequence[int], np.ndarray, None]] = None,
401        include: bool = True,
402        inplace: bool = False
403    ) -> Optional['LocalAncestryObject']:
404        """
405        Filter samples based on specified names or indexes.
406
407        This method updates the `lai`, `haplotypes`, and `samples` attributes to include or exclude the specified 
408        samples. Each sample is associated with two haplotypes, which are included or excluded together.
409        The order of the samples is preserved.
410
411        If both samples and indexes are provided, any sample matching either a name in samples or an index in 
412        indexes will be included or excluded.
413        
414        Negative indexes are supported and follow 
415        [NumPy's indexing conventions](https://numpy.org/doc/stable/user/basics.indexing.html).
416
417        Args:
418            samples (str or array_like of str, optional): 
419                 Name(s) of the samples to include or exclude. Can be a single sample name or a
420                 sequence of sample names. Default is None.
421            indexes (int or array_like of int, optional):
422                Index(es) of the samples to include or exclude. Can be a single index or a sequence
423                of indexes. Negative indexes are supported. Default is None.
424            include (bool, default=True): 
425                If True, includes only the specified samples. If False, excludes the specified
426                samples. Default is True.
427            inplace (bool, default=False): 
428                If True, modifies `self` in place. If False, returns a new `LocalAncestryObject` with the 
429                samples filtered. Default is False.
430
431        Returns:
432            **Optional[LocalAncestryObject]:** 
433                A new `LocalAncestryObject` with the specified samples filtered if `inplace=False`. 
434                If `inplace=True`, modifies `self` in place and returns None.
435        """
436        if samples is None and indexes is None:
437            raise UserWarning("At least one of 'samples' or 'indexes' must be provided.")
438
439        n_haplotypes = self.n_haplotypes
440        n_samples = self.n_samples
441
442        # Create mask based on sample names
443        if samples is not None:
444            samples = np.atleast_1d(samples)
445            # Extract sample names from haplotype identifiers
446            haplotype_ids = np.array(self['haplotypes'])
447            sample_names = np.array([hap.split('.')[0] for hap in haplotype_ids])
448            # Create mask for haplotypes belonging to specified samples
449            mask_samples = np.isin(sample_names, samples)
450        else:
451            mask_samples = np.zeros(n_haplotypes, dtype=bool)
452
453        # Create mask based on sample indexes
454        if indexes is not None:
455            indexes = np.atleast_1d(indexes)
456
457            # Validate indexes, allowing negative indexes
458            out_of_bounds_indexes = indexes[(indexes < -n_samples) | (indexes >= n_samples)]
459            if out_of_bounds_indexes.size > 0:
460                raise ValueError(f"One or more sample indexes are out of bounds.")
461
462            # Adjust negative indexes
463            indexes = np.mod(indexes, n_samples)
464            
465            # Get haplotype indexes for the specified sample indexes
466            haplotype_indexes = np.concatenate([2*indexes, 2*indexes+1])
467            # Create mask for haplotypes
468            mask_indexes = np.zeros(n_haplotypes, dtype=bool)
469            mask_indexes[haplotype_indexes] = True
470        else:
471            mask_indexes = np.zeros(n_haplotypes, dtype=bool)
472
473        # Combine masks using logical OR (union of samples)
474        mask_combined = mask_samples | mask_indexes
475
476        if not include:
477            mask_combined = ~mask_combined
478
479        # Filter `lai`
480        filtered_lai = self['lai'][:, mask_combined]
481
482        # Filter `haplotypes`
483        filtered_haplotypes = np.array(self['haplotypes'])[mask_combined].tolist()
484
485        # Filter `samples`, checking if they are None before filtering
486        sample_mask = mask_combined.reshape(-1, 2).any(axis=1)
487        filtered_samples = np.array(self['samples'])[sample_mask].tolist() if self['samples'] is not None else None
488
489        if inplace:
490            self['haplotypes'] = filtered_haplotypes
491            self['samples'] = filtered_samples
492            self['lai'] = filtered_lai
493            return None
494        else:
495            laiobj = self.copy()
496            laiobj['haplotypes'] = filtered_haplotypes
497            laiobj['samples'] = filtered_samples
498            laiobj['lai'] = filtered_lai
499            return laiobj

Filter samples based on specified names or indexes.

This method updates the lai, haplotypes, and samples attributes to include or exclude the specified samples. Each sample is associated with two haplotypes, which are included or excluded together. The order of the samples is preserved.

If both samples and indexes are provided, any sample matching either a name in samples or an index in indexes will be included or excluded.

Negative indexes are supported and follow NumPy's indexing conventions.

Arguments:
  • samples (str or array_like of str, optional): Name(s) of the samples to include or exclude. Can be a single sample name or a sequence of sample names. Default is None.
  • indexes (int or array_like of int, optional): Index(es) of the samples to include or exclude. Can be a single index or a sequence of indexes. Negative indexes are supported. Default is None.
  • include (bool, default=True): If True, includes only the specified samples. If False, excludes the specified samples. Default is True.
  • inplace (bool, default=False): If True, modifies self in place. If False, returns a new LocalAncestryObject with the samples filtered. Default is False.
Returns:

Optional[LocalAncestryObject]: A new LocalAncestryObject with the specified samples filtered if inplace=False. If inplace=True, modifies self in place and returns None.

def convert_to_snp_level( self, snpobject: Optional[snputils.snp.genobj.SNPObject] = None, variants_chrom: Optional[numpy.ndarray] = None, variants_pos: Optional[numpy.ndarray] = None, variants_ref: Optional[numpy.ndarray] = None, variants_alt: Optional[numpy.ndarray] = None, variants_filter_pass: Optional[numpy.ndarray] = None, variants_id: Optional[numpy.ndarray] = None, variants_qual: Optional[numpy.ndarray] = None, lai_format: str = '3D') -> snputils.snp.genobj.SNPObject:
501    def convert_to_snp_level(
502        self,
503        snpobject: Optional['SNPObject'] = None,
504        variants_chrom: Optional[np.ndarray] = None,
505        variants_pos: Optional[np.ndarray] = None,
506        variants_ref: Optional[np.ndarray] = None,
507        variants_alt: Optional[np.ndarray] = None,
508        variants_filter_pass: Optional[np.ndarray] = None,
509        variants_id: Optional[np.ndarray] = None,
510        variants_qual: Optional[np.ndarray] = None,
511        lai_format: str = "3D"
512    ) -> 'SNPObject':
513        """
514        Convert `self` into a `snputils.snp.genobj.SNPObject` SNP-level Local Ancestry Information (LAI), 
515        with optional support for SNP data.
516        
517        If SNP positions (`variants_pos`) and/or chromosomes (`variants_chrom`) are not specified, the method generates 
518        SNPs uniformly across the start and end positions of each genomic window. Otherwise, the provided SNP 
519        coordinates are used to assign ancestry values based on their respective windows.
520
521        If a `SNPObject` is provided, its attributes are used unless explicitly overridden by the function arguments.
522        In that case, the SNPObject is updated with the (optional) new attributes and the computed `calldata_lai`, then returned.
523
524        Args:
525            snpobject (SNPObject, optional):
526                An existing `SNPObject` to extract SNP attributes from.
527            variants_chrom (array of shape (n_snps,), optional): 
528                An array containing the chromosome for each SNP.
529            variants_pos (array of shape (n_snps,), optional): 
530                An array containing the chromosomal positions for each SNP.
531            variants_ref (array of shape (n_snps,), optional): 
532                An array containing the reference allele for each SNP.
533            variants_alt (array of shape (n_snps,), optional): 
534                An array containing the alternate allele for each SNP.
535            variants_filter_pass (array of shape (n_snps,), optional): 
536                An array indicating whether each SNP passed control checks.
537            variants_id (array of shape (n_snps,), optional): 
538                An array containing unique identifiers (IDs) for each SNP.
539            variants_qual (array of shape (n_snps,), optional): 
540                An array containing the Phred-scaled quality score for each SNP.
541            lai_format (str, optional):
542                Determines the shape of `calldata_lai`:
543                    - `"3D"` (default): Shape `(n_snps, n_samples, 2)`.
544                    - `"2D"`: Shape `(n_snps, n_samples * 2)`.
545
546        Returns:
547            **SNPObject**: 
548                A `SNPObject` containing SNP-level ancestry data and updated SNP attributes.
549        """
550        from snputils.snp.genobj.snpobj import SNPObject
551
552        assert lai_format in {"2D", "3D"}, "Invalid `lai_format`. Must be '2D' or '3D'."
553
554        # Extract attributes from SNPObject if provided
555        if snpobject is not None:
556            variants_chrom = variants_chrom or snpobject.variants_chrom
557            variants_pos = variants_pos or snpobject.variants_pos
558            variants_ref = variants_ref or snpobject.variants_ref
559            variants_alt = variants_alt or snpobject.variants_alt
560            variants_filter_pass = variants_filter_pass or snpobject.variants_filter_pass
561            variants_id = variants_id or snpobject.variants_id
562            variants_qual = variants_qual or snpobject.variants_qual
563
564        n_samples = self.n_samples
565        lai_reshaped = self.lai.reshape(self.n_windows, n_samples, 2).astype(int) if lai_format == "3D" else None
566
567        if variants_pos is None or variants_chrom is None:
568            # Generate all SNP positions and corresponding chromosome labels between window boundaries
569            variants_pos_list = []
570            variants_chrom_list = []
571            ancestry_list = []
572
573            for i in range(self.n_windows):
574                start = int(self.physical_pos[i, 0])
575                end = int(self.physical_pos[i, 1])
576                chrom = self.chromosomes[i]
577
578                # Generate SNP positions at each base pair within the window range
579                positions_in_window = np.arange(start, end + 1)
580                if positions_in_window.size == 0:
581                    continue  # Skip windows that contain no valid SNP positions
582
583                n_positions = positions_in_window.size
584                variants_pos_list.append(positions_in_window)
585                variants_chrom_list.append(np.full(n_positions, chrom))
586
587                ancestry_repeated = (
588                    np.repeat(lai_reshaped[i, np.newaxis, :, :], n_positions, axis=0)
589                    if lai_format == "3D" else np.repeat(self.lai[i, np.newaxis, :], n_positions, axis=0)
590                )
591                ancestry_list.append(ancestry_repeated)
592
593            # Store SNP positions, their corresponding chromosome labels, and their associated ancestry
594            variants_pos = np.concatenate(variants_pos_list)
595            variants_chrom = np.concatenate(variants_chrom_list)
596            calldata_lai = np.concatenate(ancestry_list)
597        else:
598            # Use the provided SNP positions and chromosomes
599            n_snps = len(variants_pos)
600            if len(variants_chrom) != n_snps:
601                raise ValueError("`variants_pos` and `variants_chrom` must have the same length.")
602
603            # Initialize an array to store the corresponding window index for each SNP
604            # Default value is -1, meaning no matching window found
605            snp_to_window_indices = np.full(n_snps, -1, dtype=int)
606
607            # Identify unique chromosome names sorted by order of appearence
608            _, idx = np.unique(variants_chrom, return_index=True)
609            unique_chroms = variants_chrom[np.sort(idx)]
610
611            # Iterate through each unique chromosome to map SNPs to windows
612            for chrom in unique_chroms:
613                # Get indices of SNPs that belong to the current chromosome
614                snp_indices = np.where(variants_chrom == chrom)[0]
615                snp_pos_chr = variants_pos[snp_indices]
616                
617                # Get indices of windows that belong to the current chromosome
618                window_indices = np.where(self.chromosomes == chrom)[0]
619                if window_indices.size == 0:
620                    continue  # Skip if no windows exist for this chromosome
621                
622                # Extract start and end positions of the windows on this chromosome
623                window_starts_chr = self.physical_pos[:, 0][window_indices]
624                window_ends_chr = self.physical_pos[:, 1][window_indices]
625                
626                # Find the right-most window that a SNP would fit into (sorted order)
627                inds = np.searchsorted(window_starts_chr, snp_pos_chr, side='right') - 1
628                
629                # Mask valid SNPs: ensure they are within a valid range and fall inside window boundaries
630                valid_mask = (inds >= 0) & (inds < len(window_starts_chr)) & (snp_pos_chr <= window_ends_chr[inds])
631
632                # Assign valid SNPs to their corresponding window indices
633                snp_to_window_indices[snp_indices[valid_mask]] = window_indices[inds[valid_mask]]
634                log.debug(f"Number of SNPs within window ranges for chromosome {chrom}: {valid_mask.sum()}")
635
636            # Initialize SNP-level ancestry array with NaNs as default
637            shape = (n_snps, n_samples, 2) if lai_format == "3D" else (n_snps, n_samples * 2)
638            calldata_lai = np.full(shape, np.nan, dtype='uint8')
639
640            # Assign ancestry values to SNPs with valid window assignments
641            valid_mask = (snp_to_window_indices != -1)
642            snp_indices = np.where(valid_mask)[0]
643            snp_to_window_indices = snp_to_window_indices[snp_indices]
644
645            if lai_format == "3D":
646                calldata_lai[snp_indices] = lai_reshaped[snp_to_window_indices]
647            else:  # "2D"
648                calldata_lai[snp_indices] = self.lai[snp_to_window_indices]
649
650        if snpobject is not None:
651            # If a SNPObject was provided, update its attributes with any new values and add `calldata_lai``
652            snpobject.variants_chrom = variants_chrom
653            snpobject.variants_pos = variants_pos
654            snpobject.variants_ref = variants_ref
655            snpobject.variants_alt = variants_alt
656            snpobject.variants_filter_pass = variants_filter_pass
657            snpobject.variants_id = variants_id
658            snpobject.variants_qual = variants_qual
659            snpobject.calldata_lai = calldata_lai
660            snpobject.ancestry_map = self.ancestry_map
661            return snpobject
662        else:
663            # Otherwise, create a new SNPObject
664            return SNPObject(
665                calldata_lai=calldata_lai.view(),
666                samples=self.samples,
667                variants_ref=variants_ref.view() if isinstance(variants_ref, np.ndarray) else variants_ref,
668                variants_alt=variants_alt.view() if isinstance(variants_alt, np.ndarray) else variants_alt,
669                variants_filter_pass=variants_filter_pass.view() if isinstance(variants_filter_pass, np.ndarray) else variants_filter_pass,
670                variants_chrom=variants_chrom.view(),
671                variants_id=variants_id.view() if isinstance(variants_id, np.ndarray) else variants_id,
672                variants_pos=variants_pos.view(),
673                variants_qual=variants_qual.view() if isinstance(variants_qual, np.ndarray) else variants_qual,
674                ancestry_map=self.ancestry_map
675            )

Convert self into a snputils.snp.genobj.SNPObject SNP-level Local Ancestry Information (LAI), with optional support for SNP data.

If SNP positions (variants_pos) and/or chromosomes (variants_chrom) are not specified, the method generates SNPs uniformly across the start and end positions of each genomic window. Otherwise, the provided SNP coordinates are used to assign ancestry values based on their respective windows.

If a SNPObject is provided, its attributes are used unless explicitly overridden by the function arguments. In that case, the SNPObject is updated with the (optional) new attributes and the computed calldata_lai, then returned.

Arguments:
  • snpobject (SNPObject, optional): An existing SNPObject to extract SNP attributes from.
  • variants_chrom (array of shape (n_snps,), optional): An array containing the chromosome for each SNP.
  • variants_pos (array of shape (n_snps,), optional): An array containing the chromosomal positions for each SNP.
  • variants_ref (array of shape (n_snps,), optional): An array containing the reference allele for each SNP.
  • variants_alt (array of shape (n_snps,), optional): An array containing the alternate allele for each SNP.
  • variants_filter_pass (array of shape (n_snps,), optional): An array indicating whether each SNP passed control checks.
  • variants_id (array of shape (n_snps,), optional): An array containing unique identifiers (IDs) for each SNP.
  • variants_qual (array of shape (n_snps,), optional): An array containing the Phred-scaled quality score for each SNP.
  • lai_format (str, optional): Determines the shape of calldata_lai:
    • "3D" (default): Shape (n_snps, n_samples, 2).
    • "2D": Shape (n_snps, n_samples * 2).
Returns:

SNPObject: A SNPObject containing SNP-level ancestry data and updated SNP attributes.

def save(self, file: Union[str, pathlib._local.Path]) -> None:
699    def save(self, file: Union[str, Path]) -> None:
700        """
701        Save the data stored in `self` to a specified file.
702        If the file already exists, it will be overwritten.
703
704        The format of the saved file is determined by the file extension provided in the `file` 
705        argument.
706
707        **Supported formats:**
708
709        - `.msp`: Text-based MSP format.
710        - `.msp.tsv`: Text-based MSP format with TSV extension.
711        - `.pkl`: Pickle format for saving `self` in serialized form.
712
713        Args:
714            file (str or pathlib.Path): 
715                Path to the file where the data will be saved. The extension of the file determines the save format. 
716                Supported extensions: `.msp`, `.msp.tsv`, `.pkl`.
717        """
718        path = Path(file)
719        suffixes = [suffix.lower() for suffix in path.suffixes]
720
721        if suffixes[-2:] == ['.msp', '.tsv'] or suffixes[-1] == '.msp':
722            self.save_msp(file)
723        elif suffixes[-1] == '.pkl':
724            self.save_pickle(file)
725        else:
726            raise ValueError(
727                f"Unsupported file extension: {suffixes[-1]}"
728                "Supported extensions are: .msp, .msp.tsv, .pkl."
729            )

Save the data stored in self to a specified file. If the file already exists, it will be overwritten.

The format of the saved file is determined by the file extension provided in the file argument.

Supported formats:

  • .msp: Text-based MSP format.
  • .msp.tsv: Text-based MSP format with TSV extension.
  • .pkl: Pickle format for saving self in serialized form.
Arguments:
  • file (str or pathlib.Path): Path to the file where the data will be saved. The extension of the file determines the save format. Supported extensions: .msp, .msp.tsv, .pkl.
def save_msp(self, file: Union[str, pathlib._local.Path]) -> None:
731    def save_msp(self, file: Union[str, Path]) -> None:
732        """
733        Save the data stored in `self` to a `.msp` file.
734        If the file already exists, it will be overwritten.
735
736        Args:
737            file (str or pathlib.Path): 
738                Path to the file where the data will be saved. It should end with `.msp` or `.msp.tsv`. 
739                If the provided path does not have one of these extensions, the `.msp` extension will be appended.
740        """
741        from snputils.ancestry.io.local.write import MSPWriter
742
743        MSPWriter(self, file).write()

Save the data stored in self to a .msp file. If the file already exists, it will be overwritten.

Arguments:
  • file (str or pathlib.Path): Path to the file where the data will be saved. It should end with .msp or .msp.tsv. If the provided path does not have one of these extensions, the .msp extension will be appended.
def save_pickle(self, file: Union[str, pathlib._local.Path]) -> None:
745    def save_pickle(self, file: Union[str, Path]) -> None:
746        """
747        Save `self` in serialized form to a `.pkl` file.
748        If the file already exists, it will be overwritten.
749
750        Args:
751            file (str or pathlib.Path): 
752                Path to the file where the data will be saved. It should end with `.pkl`. 
753                If the provided path does not have this extension, it will be appended.
754        """
755        import pickle
756        with open(file, 'wb') as file:
757            pickle.dump(self, file)

Save self in serialized form to a .pkl file. If the file already exists, it will be overwritten.

Arguments:
  • file (str or pathlib.Path): Path to the file where the data will be saved. It should end with .pkl. If the provided path does not have this extension, it will be appended.