snputils.ancestry.genobj
10class GlobalAncestryObject(AncestryObject): 11 """ 12 A class for Global Ancestry Inference (GAI) data. 13 """ 14 def __init__( 15 self, 16 Q: np.ndarray, 17 P: np.ndarray, 18 samples: Optional[Sequence] = None, 19 snps: Optional[Sequence] = None, 20 ancestries: Optional[Sequence] = None 21 ) -> None: 22 """ 23 Args: 24 Q (array of shape (n_samples, n_ancestries)): 25 A 2D array containing per-sample ancestry proportions. Each row corresponds to a sample, 26 and each column corresponds to an ancestry. 27 P (array of shape (n_snps, n_ancestries)): 28 A 2D array containing per-ancestry SNP frequencies. Each row corresponds to a SNP, 29 and each column corresponds to an ancestry. 30 samples (sequence of length n_samples, optional): 31 A sequence containing unique identifiers for each sample. If None, sample identifiers 32 are assigned as integers from `0` to `n_samples - 1`. 33 snps (sequence of length n_snps, optional): 34 A sequence containing identifiers for each SNP. If None, SNPs are assigned as integers 35 from `0` to `n_snps - 1`. 36 ancestries (sequence of length n_samples, optional): 37 A sequence containing ancestry labels for each sample. 38 """ 39 # Determine dimensions 40 n_samples, n_ancestries_Q = Q.shape 41 n_snps, n_ancestries_P = P.shape 42 43 if n_ancestries_Q != n_ancestries_P: 44 raise ValueError( 45 f"The number of ancestries in Q ({n_ancestries_Q}) and P ({n_ancestries_P}) must be the same." 46 ) 47 48 n_ancestries = n_ancestries_Q 49 50 # Assign default sample identifiers if none provided 51 if samples is None: 52 samples = list(range(n_samples)) 53 else: 54 samples = list(samples) 55 if len(samples) != n_samples: 56 raise ValueError( 57 f"Length of samples ({len(samples)}) does not match number of samples ({n_samples})." 58 ) 59 60 # Assign default SNP identifiers if none provided 61 if snps is None: 62 snps = list(range(n_snps)) 63 else: 64 snps = list(snps) 65 if len(snps) != n_snps: 66 raise ValueError( 67 f"Length of snps ({len(snps)}) does not match number of SNPs ({n_snps})." 68 ) 69 70 if len(ancestries) != n_samples: 71 raise ValueError( 72 f"Length of ancestries ({len(ancestries)}) does not match number of samples ({n_samples})." 73 ) 74 75 super().__init__(n_samples, n_ancestries) 76 77 # Store attributes 78 self.__Q = Q 79 self.__P = P 80 self.__samples = np.asarray(samples) 81 self.__snps = np.asarray(snps) 82 self.__ancestries = np.asarray(ancestries) 83 84 # Perform sanity checks 85 self._sanity_check() 86 87 def __getitem__(self, key): 88 """ 89 To access an attribute of the class using the square bracket notation, 90 similar to a dictionary. 91 """ 92 try: 93 return getattr(self, key) 94 except AttributeError: 95 raise KeyError(f'Invalid key: {key}') 96 97 def __setitem__(self, key, value): 98 """ 99 To set an attribute of the class using the square bracket notation, 100 similar to a dictionary. 101 """ 102 try: 103 setattr(self, key, value) 104 except AttributeError: 105 raise KeyError(f'Invalid key: {key}') 106 107 @property 108 def Q(self) -> np.ndarray: 109 """ 110 Retrieve `Q`. 111 112 Returns: 113 **array of shape (n_samples, n_ancestries):** 114 A 2D array containing per-sample ancestry proportions. Each row corresponds to a sample, 115 and each column corresponds to an ancestry. 116 """ 117 return self.__Q 118 119 @Q.setter 120 def Q(self, x: np.ndarray): 121 """ 122 Update `Q`. 123 """ 124 if x.shape != (self.n_samples, self.n_ancestries): 125 raise ValueError( 126 f"Q must have shape ({self.n_samples}, {self.n_ancestries}); got {x.shape}." 127 ) 128 self.__Q = x 129 130 @property 131 def P(self) -> np.ndarray: 132 """ 133 Retrieve `P`. 134 135 Returns: 136 **array of shape (n_snps, n_ancestries):** 137 A 2D array containing per-ancestry SNP frequencies. Each row corresponds to a SNP, 138 and each column corresponds to an ancestry. 139 """ 140 return self.__P 141 142 @P.setter 143 def P(self, x: np.ndarray): 144 """ 145 Update `P`. 146 """ 147 if x.shape != (self.n_snps, self.n_ancestries): 148 raise ValueError( 149 f"P must have shape ({self.n_snps}, {self.n_ancestries}). Provided shape is {x.shape}." 150 ) 151 self.__P = x 152 self._sanity_check() 153 154 @property 155 def F(self) -> np.ndarray: 156 """ 157 Alias for `P`. 158 159 Returns: 160 **array of shape (n_snps, n_ancestries):** 161 A 2D array containing per-ancestry SNP frequencies. Each row corresponds to a SNP, 162 and each column corresponds to an ancestry. 163 """ 164 return self.P 165 166 @F.setter 167 def F(self, x: np.ndarray): 168 """ 169 Update `F`. 170 """ 171 if x.shape != (self.n_snps, self.n_ancestries): 172 raise ValueError( 173 f"F must have shape ({self.n_snps}, {self.n_ancestries}). Provided shape is {x.shape}." 174 ) 175 self.__P = x 176 177 @property 178 def samples(self) -> Optional[np.ndarray]: 179 """ 180 Retrieve `samples`. 181 182 Returns: 183 **array of shape (n_samples,):** 184 An array containing unique identifiers for each sample. If None, sample 185 identifiers are assigned as integers from `0` to `n_samples - 1`. 186 """ 187 return self.__samples 188 189 @samples.setter 190 def samples(self, x: Sequence): 191 """ 192 Update `samples`. 193 """ 194 x = list(x) 195 if len(x) != self.n_samples: 196 raise ValueError( 197 f"samples must have length {self.n_samples}; got length {len(x)}." 198 ) 199 self.__samples = x 200 201 @property 202 def snps(self) -> Optional[np.ndarray]: 203 """ 204 Retrieve `snps`. 205 206 Returns: 207 **array of shape (n_snps,):** 208 An array containing identifiers for each SNP. If None, SNPs are assigned as integers 209 from `0` to `n_snps - 1`. 210 """ 211 return self.__snps 212 213 @snps.setter 214 def snps(self, x: Sequence): 215 """ 216 Update `snps`. 217 """ 218 x = list(x) 219 if len(x) != self.n_snps: 220 raise ValueError( 221 f"snps must have length {self.n_snps}; got length {len(x)}." 222 ) 223 self.__snps = np.asarray(x) 224 225 @property 226 def ancestries(self) -> Optional[np.ndarray]: 227 """ 228 Retrieve `ancestries`. 229 230 Returns: 231 **array of shape (n_samples,):** 232 An array containing ancestry labels for each sample. 233 """ 234 return self.__ancestries 235 236 @ancestries.setter 237 def ancestries(self, x: Sequence): 238 """ 239 Update `ancestries`. 240 """ 241 x = list(x) 242 num_x = len(x) 243 num_unique_x = len(np.unique(x)) 244 245 if num_x != self.n_samples: 246 raise ValueError( 247 f"ancestries must have length {self.n_samples}; got length {num_x}." 248 ) 249 if num_unique_x <= self.n_ancestries: 250 raise ValueError( 251 f"Number of unique ancestry labels must be less than or equal to {self.n_ancestries}; got {num_unique_x} unique labels." 252 ) 253 self.__ancestries = np.asarray(x) 254 255 @property 256 def n_samples(self) -> int: 257 """ 258 Retrieve `n_samples`. 259 260 Returns: 261 **int:** The total number of samples. 262 """ 263 return self.__Q.shape[0] 264 265 @property 266 def n_snps(self) -> int: 267 """ 268 Retrieve `n_snps`. 269 270 Returns: 271 **int:** The total number of SNPs. 272 """ 273 return self.__P.shape[0] 274 275 @property 276 def n_ancestries(self) -> int: 277 """ 278 Retrieve `n_ancestries`. 279 280 Returns: 281 **int:** The total number of unique ancestries. 282 """ 283 return self.__Q.shape[1] 284 285 def copy(self) -> 'GlobalAncestryObject': 286 """ 287 Create and return a copy of `self`. 288 289 Returns: 290 **GlobalAncestryObject:** A new instance of the current object. 291 """ 292 return copy.copy(self) 293 294 def keys(self) -> List[str]: 295 """ 296 Retrieve a list of public attribute names for `self`. 297 298 Returns: 299 **list of str:** 300 A list of attribute names, with internal name-mangling removed, 301 for easier reference to public attributes in the instance. 302 """ 303 return [attr.replace('_GlobalAncestryObject__', '').replace('_AncestryObject__', '') for attr in vars(self)] 304 305 def _sanity_check(self) -> None: 306 """ 307 Perform sanity checks to ensure that matrix dimensions are consistent with expected sizes. 308 309 Raises: 310 **ValueError:** If any of the matrix dimensions do not match the expected sizes. 311 """ 312 # Check that the Q matrix has the correct shape 313 if self.__Q.shape != (self.n_samples, self.n_ancestries): 314 raise ValueError( 315 f"Q must have shape ({self.n_samples}, {self.n_ancestries}); got {self.__Q.shape}." 316 ) 317 318 # Check that the P matrix has the correct shape 319 if self.__P.shape != (self.n_snps, self.n_ancestries): 320 raise ValueError( 321 f"P must have shape ({self.n_snps}, {self.n_ancestries}); got {self.__P.shape}." 322 ) 323 324 # Check that samples length matches n_samples 325 if self.samples is not None: 326 if len(self.__samples) != self.n_samples: 327 raise ValueError( 328 f"samples must have length {self.n_samples}; got length {len(self.__samples)}." 329 ) 330 331 # Check that snps length matches n_snps 332 if self.snps is not None: 333 if len(self.__snps) != self.n_snps: 334 raise ValueError( 335 f"snps must have length {self.n_snps}; got length {len(self.__snps)}." 336 ) 337 338 # Check that ancestries length matches n_samples 339 if self.ancestries is not None: 340 if len(self.__ancestries) != self.n_samples: 341 raise ValueError( 342 f"ancestries must have length {self.n_samples}; got length {len(self.__ancestries)}." 343 ) 344 345 # Check number of unique ancestry labels 346 num_unique_ancestries = len(np.unique(self.__ancestries)) 347 if num_unique_ancestries <= self.n_ancestries: 348 raise ValueError( 349 f"Number of unique ancestry labels must be less than or equal to {self.n_ancestries}; got {num_unique_ancestries} unique labels." 350 ) 351 352 def save(self, file: Union[str, Path]) -> None: 353 """ 354 Save the data stored in `self` to a specified file or set of files. 355 356 The format of the saved file(s) is determined by the file extension provided in the `file` 357 argument. If the extension is `.pkl`, the object is serialized as a pickle file. Otherwise, 358 the file is treated as a prefix for saving ADMIXTURE files. 359 360 **Supported formats:** 361 362 - `.pkl`: Pickle format for saving `self` in serialized form. 363 - Any other extension or no extension: Treated as a prefix for ADMIXTURE files. 364 365 Args: 366 file (str or pathlib.Path): 367 Path to the file where the data will be saved. If the extension is `.pkl`, the object 368 is serialized. Otherwise, it is treated as a prefix for ADMIXTURE files. 369 """ 370 path = Path(file) 371 suffix = path.suffix.lower() 372 373 if suffix == '.pkl': 374 self.save_pickle(path) 375 else: 376 self.save_admixture(path) 377 378 def save_admixture(self, file_prefix: Union[str, Path]) -> None: 379 """ 380 Save the data stored in `self` into multiple ADMIXTURE files. 381 If the file already exists, it will be overwritten. 382 383 **Output files:** 384 385 - `<file_prefix>.K.Q`: Q matrix file. The file uses space (' ') as the delimiter. 386 - `<file_prefix>.K.P`: P matrix file. The file uses space (' ') as the delimiter. 387 - `<file_prefix>.sample_ids.txt`: Sample IDs file (if sample IDs are available). 388 - `<file_prefix>.snp_ids.txt`: SNP IDs file (if SNP IDs are available). 389 - `<file_prefix>.map`: Ancestry file (if ancestries information is available). 390 391 Args: 392 file_prefix (str or pathlib.Path): 393 The base prefix for output file names, including directory path but excluding file extensions. 394 The prefix is used to generate specific file names for each output, with file-specific 395 suffixes appended as described above (e.g., `file_prefix.n_ancestries.Q` for the Q matrix file). 396 """ 397 from snputils.ancestry.io.wide.write.admixture import AdmixtureWriter 398 399 AdmixtureWriter(self, file_prefix).write() 400 401 def save_pickle(self, file: Union[str, Path]) -> None: 402 """ 403 Save `self` in serialized form to a `.pkl` file. 404 If the file already exists, it will be overwritten. 405 406 Args: 407 file (str or pathlib.Path): 408 Path to the file where the data will be saved. It should end with `.pkl`. 409 If the provided path does not have this extension, it will be appended. 410 """ 411 import pickle 412 with open(file, 'wb') as file: 413 pickle.dump(self, file)
A class for Global Ancestry Inference (GAI) data.
14 def __init__( 15 self, 16 Q: np.ndarray, 17 P: np.ndarray, 18 samples: Optional[Sequence] = None, 19 snps: Optional[Sequence] = None, 20 ancestries: Optional[Sequence] = None 21 ) -> None: 22 """ 23 Args: 24 Q (array of shape (n_samples, n_ancestries)): 25 A 2D array containing per-sample ancestry proportions. Each row corresponds to a sample, 26 and each column corresponds to an ancestry. 27 P (array of shape (n_snps, n_ancestries)): 28 A 2D array containing per-ancestry SNP frequencies. Each row corresponds to a SNP, 29 and each column corresponds to an ancestry. 30 samples (sequence of length n_samples, optional): 31 A sequence containing unique identifiers for each sample. If None, sample identifiers 32 are assigned as integers from `0` to `n_samples - 1`. 33 snps (sequence of length n_snps, optional): 34 A sequence containing identifiers for each SNP. If None, SNPs are assigned as integers 35 from `0` to `n_snps - 1`. 36 ancestries (sequence of length n_samples, optional): 37 A sequence containing ancestry labels for each sample. 38 """ 39 # Determine dimensions 40 n_samples, n_ancestries_Q = Q.shape 41 n_snps, n_ancestries_P = P.shape 42 43 if n_ancestries_Q != n_ancestries_P: 44 raise ValueError( 45 f"The number of ancestries in Q ({n_ancestries_Q}) and P ({n_ancestries_P}) must be the same." 46 ) 47 48 n_ancestries = n_ancestries_Q 49 50 # Assign default sample identifiers if none provided 51 if samples is None: 52 samples = list(range(n_samples)) 53 else: 54 samples = list(samples) 55 if len(samples) != n_samples: 56 raise ValueError( 57 f"Length of samples ({len(samples)}) does not match number of samples ({n_samples})." 58 ) 59 60 # Assign default SNP identifiers if none provided 61 if snps is None: 62 snps = list(range(n_snps)) 63 else: 64 snps = list(snps) 65 if len(snps) != n_snps: 66 raise ValueError( 67 f"Length of snps ({len(snps)}) does not match number of SNPs ({n_snps})." 68 ) 69 70 if len(ancestries) != n_samples: 71 raise ValueError( 72 f"Length of ancestries ({len(ancestries)}) does not match number of samples ({n_samples})." 73 ) 74 75 super().__init__(n_samples, n_ancestries) 76 77 # Store attributes 78 self.__Q = Q 79 self.__P = P 80 self.__samples = np.asarray(samples) 81 self.__snps = np.asarray(snps) 82 self.__ancestries = np.asarray(ancestries) 83 84 # Perform sanity checks 85 self._sanity_check()
Arguments:
- Q (array of shape (n_samples, n_ancestries)): A 2D array containing per-sample ancestry proportions. Each row corresponds to a sample, and each column corresponds to an ancestry.
- P (array of shape (n_snps, n_ancestries)): A 2D array containing per-ancestry SNP frequencies. Each row corresponds to a SNP, and each column corresponds to an ancestry.
- samples (sequence of length n_samples, optional): A sequence containing unique identifiers for each sample. If None, sample identifiers
are assigned as integers from
0
ton_samples - 1
. - snps (sequence of length n_snps, optional): A sequence containing identifiers for each SNP. If None, SNPs are assigned as integers
from
0
ton_snps - 1
. - ancestries (sequence of length n_samples, optional): A sequence containing ancestry labels for each sample.
107 @property 108 def Q(self) -> np.ndarray: 109 """ 110 Retrieve `Q`. 111 112 Returns: 113 **array of shape (n_samples, n_ancestries):** 114 A 2D array containing per-sample ancestry proportions. Each row corresponds to a sample, 115 and each column corresponds to an ancestry. 116 """ 117 return self.__Q
Retrieve Q
.
Returns:
array of shape (n_samples, n_ancestries): A 2D array containing per-sample ancestry proportions. Each row corresponds to a sample, and each column corresponds to an ancestry.
130 @property 131 def P(self) -> np.ndarray: 132 """ 133 Retrieve `P`. 134 135 Returns: 136 **array of shape (n_snps, n_ancestries):** 137 A 2D array containing per-ancestry SNP frequencies. Each row corresponds to a SNP, 138 and each column corresponds to an ancestry. 139 """ 140 return self.__P
Retrieve P
.
Returns:
array of shape (n_snps, n_ancestries): A 2D array containing per-ancestry SNP frequencies. Each row corresponds to a SNP, and each column corresponds to an ancestry.
154 @property 155 def F(self) -> np.ndarray: 156 """ 157 Alias for `P`. 158 159 Returns: 160 **array of shape (n_snps, n_ancestries):** 161 A 2D array containing per-ancestry SNP frequencies. Each row corresponds to a SNP, 162 and each column corresponds to an ancestry. 163 """ 164 return self.P
Alias for P
.
Returns:
array of shape (n_snps, n_ancestries): A 2D array containing per-ancestry SNP frequencies. Each row corresponds to a SNP, and each column corresponds to an ancestry.
177 @property 178 def samples(self) -> Optional[np.ndarray]: 179 """ 180 Retrieve `samples`. 181 182 Returns: 183 **array of shape (n_samples,):** 184 An array containing unique identifiers for each sample. If None, sample 185 identifiers are assigned as integers from `0` to `n_samples - 1`. 186 """ 187 return self.__samples
Retrieve samples
.
Returns:
array of shape (n_samples,): An array containing unique identifiers for each sample. If None, sample identifiers are assigned as integers from
0
ton_samples - 1
.
201 @property 202 def snps(self) -> Optional[np.ndarray]: 203 """ 204 Retrieve `snps`. 205 206 Returns: 207 **array of shape (n_snps,):** 208 An array containing identifiers for each SNP. If None, SNPs are assigned as integers 209 from `0` to `n_snps - 1`. 210 """ 211 return self.__snps
Retrieve snps
.
Returns:
array of shape (n_snps,): An array containing identifiers for each SNP. If None, SNPs are assigned as integers from
0
ton_snps - 1
.
225 @property 226 def ancestries(self) -> Optional[np.ndarray]: 227 """ 228 Retrieve `ancestries`. 229 230 Returns: 231 **array of shape (n_samples,):** 232 An array containing ancestry labels for each sample. 233 """ 234 return self.__ancestries
Retrieve ancestries
.
Returns:
array of shape (n_samples,): An array containing ancestry labels for each sample.
285 def copy(self) -> 'GlobalAncestryObject': 286 """ 287 Create and return a copy of `self`. 288 289 Returns: 290 **GlobalAncestryObject:** A new instance of the current object. 291 """ 292 return copy.copy(self)
Create and return a copy of self
.
Returns:
GlobalAncestryObject: A new instance of the current object.
294 def keys(self) -> List[str]: 295 """ 296 Retrieve a list of public attribute names for `self`. 297 298 Returns: 299 **list of str:** 300 A list of attribute names, with internal name-mangling removed, 301 for easier reference to public attributes in the instance. 302 """ 303 return [attr.replace('_GlobalAncestryObject__', '').replace('_AncestryObject__', '') for attr in vars(self)]
Retrieve a list of public attribute names for self
.
Returns:
list of str: A list of attribute names, with internal name-mangling removed, for easier reference to public attributes in the instance.
352 def save(self, file: Union[str, Path]) -> None: 353 """ 354 Save the data stored in `self` to a specified file or set of files. 355 356 The format of the saved file(s) is determined by the file extension provided in the `file` 357 argument. If the extension is `.pkl`, the object is serialized as a pickle file. Otherwise, 358 the file is treated as a prefix for saving ADMIXTURE files. 359 360 **Supported formats:** 361 362 - `.pkl`: Pickle format for saving `self` in serialized form. 363 - Any other extension or no extension: Treated as a prefix for ADMIXTURE files. 364 365 Args: 366 file (str or pathlib.Path): 367 Path to the file where the data will be saved. If the extension is `.pkl`, the object 368 is serialized. Otherwise, it is treated as a prefix for ADMIXTURE files. 369 """ 370 path = Path(file) 371 suffix = path.suffix.lower() 372 373 if suffix == '.pkl': 374 self.save_pickle(path) 375 else: 376 self.save_admixture(path)
Save the data stored in self
to a specified file or set of files.
The format of the saved file(s) is determined by the file extension provided in the file
argument. If the extension is .pkl
, the object is serialized as a pickle file. Otherwise,
the file is treated as a prefix for saving ADMIXTURE files.
Supported formats:
.pkl
: Pickle format for savingself
in serialized form.- Any other extension or no extension: Treated as a prefix for ADMIXTURE files.
Arguments:
- file (str or pathlib.Path): Path to the file where the data will be saved. If the extension is
.pkl
, the object is serialized. Otherwise, it is treated as a prefix for ADMIXTURE files.
378 def save_admixture(self, file_prefix: Union[str, Path]) -> None: 379 """ 380 Save the data stored in `self` into multiple ADMIXTURE files. 381 If the file already exists, it will be overwritten. 382 383 **Output files:** 384 385 - `<file_prefix>.K.Q`: Q matrix file. The file uses space (' ') as the delimiter. 386 - `<file_prefix>.K.P`: P matrix file. The file uses space (' ') as the delimiter. 387 - `<file_prefix>.sample_ids.txt`: Sample IDs file (if sample IDs are available). 388 - `<file_prefix>.snp_ids.txt`: SNP IDs file (if SNP IDs are available). 389 - `<file_prefix>.map`: Ancestry file (if ancestries information is available). 390 391 Args: 392 file_prefix (str or pathlib.Path): 393 The base prefix for output file names, including directory path but excluding file extensions. 394 The prefix is used to generate specific file names for each output, with file-specific 395 suffixes appended as described above (e.g., `file_prefix.n_ancestries.Q` for the Q matrix file). 396 """ 397 from snputils.ancestry.io.wide.write.admixture import AdmixtureWriter 398 399 AdmixtureWriter(self, file_prefix).write()
Save the data stored in self
into multiple ADMIXTURE files.
If the file already exists, it will be overwritten.
Output files:
<file_prefix>.K.Q
: Q matrix file. The file uses space (' ') as the delimiter.<file_prefix>.K.P
: P matrix file. The file uses space (' ') as the delimiter.<file_prefix>.sample_ids.txt
: Sample IDs file (if sample IDs are available).<file_prefix>.snp_ids.txt
: SNP IDs file (if SNP IDs are available).<file_prefix>.map
: Ancestry file (if ancestries information is available).
Arguments:
- file_prefix (str or pathlib.Path): The base prefix for output file names, including directory path but excluding file extensions.
The prefix is used to generate specific file names for each output, with file-specific
suffixes appended as described above (e.g.,
file_prefix.n_ancestries.Q
for the Q matrix file).
401 def save_pickle(self, file: Union[str, Path]) -> None: 402 """ 403 Save `self` in serialized form to a `.pkl` file. 404 If the file already exists, it will be overwritten. 405 406 Args: 407 file (str or pathlib.Path): 408 Path to the file where the data will be saved. It should end with `.pkl`. 409 If the provided path does not have this extension, it will be appended. 410 """ 411 import pickle 412 with open(file, 'wb') as file: 413 pickle.dump(self, file)
Save self
in serialized form to a .pkl
file.
If the file already exists, it will be overwritten.
Arguments:
- file (str or pathlib.Path): Path to the file where the data will be saved. It should end with
.pkl
. If the provided path does not have this extension, it will be appended.
17class LocalAncestryObject(AncestryObject): 18 """ 19 A class for window-level Local Ancestry Inference (LAI) data. 20 """ 21 def __init__( 22 self, 23 haplotypes: List[str], 24 lai: np.ndarray, 25 samples: Optional[List[str]] = None, 26 ancestry_map: Optional[Dict[str, str]] = None, 27 window_sizes: Optional[np.ndarray] = None, 28 centimorgan_pos: Optional[np.ndarray] = None, 29 chromosomes: Optional[np.ndarray] = None, 30 physical_pos: Optional[np.ndarray] = None 31 ) -> None: 32 """ 33 Args: 34 haplotypes (list of str of length n_haplotypes): 35 A list of unique haplotype identifiers. 36 lai (array of shape (n_windows, n_haplotypes)): 37 A 2D array containing local ancestry inference values, where each row represents a 38 genomic window, and each column corresponds to a haplotype phase for each sample. 39 samples (list of str of length n_samples, optional): 40 A list of unique sample identifiers. 41 ancestry_map (dict of str to str, optional): 42 A dictionary mapping ancestry codes to region names. 43 window_sizes (array of shape (n_windows,), optional): 44 An array specifying the number of SNPs in each genomic window. 45 centimorgan_pos (array of shape (n_windows, 2), optional): 46 A 2D array containing the start and end centimorgan positions for each window. 47 chromosomes (array of shape (n_windows,), optional): 48 An array with chromosome numbers corresponding to each genomic window. 49 physical_pos (array of shape (n_windows, 2), optional): 50 A 2D array containing the start and end physical positions for each window. 51 """ 52 if lai.ndim != 2: 53 raise ValueError("`lai` must be a 2D array with shape (n_windows, n_haplotypes).") 54 55 # Determine the number of unique ancestries and samples from the LAI array 56 n_ancestries = len(np.unique(lai)) 57 n_haplotypes = lai.shape[1] 58 n_samples = n_haplotypes // 2 59 60 super(LocalAncestryObject, self).__init__(n_samples, n_ancestries) 61 62 self.__haplotypes = haplotypes 63 self.__lai = lai 64 self.__window_sizes = window_sizes 65 self.__centimorgan_pos = centimorgan_pos 66 self.__samples = samples 67 self.__chromosomes = chromosomes 68 self.__physical_pos = physical_pos 69 self.__ancestry_map = ancestry_map 70 71 # Perform sanity check to ensure all unique ancestries in LAI data are represented in the ancestry map 72 self._sanity_check() 73 74 def __getitem__(self, key): 75 """ 76 To access an attribute of the class using the square bracket notation, 77 similar to a dictionary. 78 """ 79 try: 80 return getattr(self, key) 81 except AttributeError: 82 raise KeyError(f'Invalid key: {key}') 83 84 def __setitem__(self, key, value): 85 """ 86 To set an attribute of the class using the square bracket notation, 87 similar to a dictionary. 88 """ 89 try: 90 setattr(self, key, value) 91 except AttributeError: 92 raise KeyError(f'Invalid key: {key}') 93 94 @property 95 def haplotypes(self) -> List[str]: 96 """ 97 Retrieve `haplotypes`. 98 99 Returns: 100 **list of length n_haplotypes:** A list of unique haplotype identifiers. 101 """ 102 return self.__haplotypes 103 104 @haplotypes.setter 105 def haplotypes(self, x): 106 """ 107 Update `haplotypes`. 108 """ 109 self.__haplotypes = x 110 111 @property 112 def lai(self) -> np.ndarray: 113 """ 114 Retrieve `lai`. 115 116 Returns: 117 **array of shape (n_windows, n_haplotypes):** 118 A 2D array containing local ancestry inference values, where each row represents a 119 genomic window, and each column corresponds to a haplotype phase for each sample. 120 """ 121 return self.__lai 122 123 @lai.setter 124 def lai(self, x): 125 """ 126 Update `lai`. 127 """ 128 self.__lai = x 129 130 @property 131 def samples(self) -> Optional[List[str]]: 132 """ 133 Retrieve `samples`. 134 135 Returns: 136 **list of str:** A list of unique sample identifiers. 137 """ 138 if self.__samples is not None: 139 return self.__samples 140 elif self.__haplotypes is not None: 141 return [hap.split('.')[0] for hap in self.__haplotypes][::2] 142 else: 143 return None 144 145 @samples.setter 146 def samples(self, x): 147 """ 148 Update `samples`. 149 """ 150 self.__samples = x 151 152 @property 153 def ancestry_map(self) -> Optional[Dict[str, str]]: 154 """ 155 Retrieve `ancestry_map`. 156 157 Returns: 158 **dict of str to str:** A dictionary mapping ancestry codes to region names. 159 """ 160 return self.__ancestry_map 161 162 @ancestry_map.setter 163 def ancestry_map(self, x): 164 """ 165 Update `ancestry_map`. 166 """ 167 self.__ancestry_map = x 168 169 @property 170 def window_sizes(self) -> Optional[np.ndarray]: 171 """ 172 Retrieve `window_sizes`. 173 174 Returns: 175 **array of shape (n_windows,):** 176 An array specifying the number of SNPs in each genomic window. 177 """ 178 return self.__window_sizes 179 180 @window_sizes.setter 181 def window_sizes(self, x): 182 """ 183 Update `window_sizes`. 184 """ 185 self.__window_sizes = x 186 187 @property 188 def centimorgan_pos(self) -> Optional[np.ndarray]: 189 """ 190 Retrieve `centimorgan_pos`. 191 192 Returns: 193 **array of shape (n_windows, 2):** 194 A 2D array containing the start and end centimorgan positions for each window. 195 """ 196 return self.__centimorgan_pos 197 198 @centimorgan_pos.setter 199 def centimorgan_pos(self, x): 200 """ 201 Update `centimorgan_pos`. 202 """ 203 self.__centimorgan_pos = x 204 205 @property 206 def chromosomes(self) -> Optional[np.ndarray]: 207 """ 208 Retrieve `chromosomes`. 209 210 Returns: 211 **array of shape (n_windows,):** 212 An array with chromosome numbers corresponding to each genomic window. 213 """ 214 return self.__chromosomes 215 216 @chromosomes.setter 217 def chromosomes(self, x): 218 """ 219 Update `chromosomes`. 220 """ 221 self.__chromosomes = x 222 223 @property 224 def physical_pos(self) -> Optional[np.ndarray]: 225 """ 226 Retrieve `physical_pos`. 227 228 Returns: 229 **array of shape (n_windows, 2):** 230 A 2D array containing the start and end physical positions for each window. 231 """ 232 return self.__physical_pos 233 234 @physical_pos.setter 235 def physical_pos(self, x): 236 """ 237 Update `physical_pos`. 238 """ 239 self.__physical_pos = x 240 241 @property 242 def n_samples(self) -> int: 243 """ 244 Retrieve `n_samples`. 245 246 Returns: 247 **int:** 248 The total number of samples. 249 """ 250 if self.__samples is not None: 251 return len(self.__samples) 252 elif self.__haplotypes is not None: 253 # Divide by 2 because each sample has two associated haplotypes 254 return len(self.__haplotypes) // 2 255 else: 256 #Â Divide by 2 because columns represent haplotypes 257 return self.__lai.shape[1] // 2 258 259 @property 260 def n_ancestries(self) -> int: 261 """ 262 Retrieve `n_ancestries`. 263 264 Returns: 265 **int:** The total number of unique ancestries. 266 """ 267 return len(np.unique(self.__lai)) 268 269 @property 270 def n_haplotypes(self) -> int: 271 """ 272 Retrieve `n_haplotypes`. 273 274 Returns: 275 **int:** The total number of haplotypes. 276 """ 277 if self.__haplotypes is not None: 278 return len(self.__haplotypes) 279 else: 280 return self.__lai.shape[1] 281 282 @property 283 def n_windows(self) -> int: 284 """ 285 Retrieve `n_windows`. 286 287 Returns: 288 **int:** The total number of genomic windows. 289 """ 290 return self.__lai.shape[0] 291 292 def copy(self) -> 'LocalAncestryObject': 293 """ 294 Create and return a copy of `self`. 295 296 Returns: 297 **LocalAncestryObject:** 298 A new instance of the current object. 299 """ 300 return copy.copy(self) 301 302 def keys(self) -> List[str]: 303 """ 304 Retrieve a list of public attribute names for `self`. 305 306 Returns: 307 **list of str:** 308 A list of attribute names, with internal name-mangling removed, 309 for easier reference to public attributes in the instance. 310 """ 311 return [attr.replace('_LocalAncestryObject__', '').replace('_AncestryObject__', '') for attr in vars(self)] 312 313 def filter_windows( 314 self, 315 indexes: Union[int, Sequence[int], np.ndarray], 316 include: bool = True, 317 inplace: bool = False 318 ) -> Optional['LocalAncestryObject']: 319 """ 320 Filter genomic windows based on specified indexes. 321 322 This method updates the `lai` attribute to include or exclude the specified genomic windows. 323 Attributes such as `window_sizes`, `centimorgan_pos`, `chromosomes`, and `physical_pos` will also be 324 updated accordingly if they are not None. The order of genomic windows is preserved. 325 326 Negative indexes are supported and follow 327 [NumPy's indexing conventions](https://numpy.org/doc/stable/user/basics.indexing.html). 328 329 Args: 330 indexes (int or array-like of int): 331 Index(es) of the windows to include or exclude. Can be a single integer or a 332 sequence of integers. Negative indexes are supported. 333 include (bool, default=True): 334 If True, includes only the specified windows. If False, excludes the specified 335 windows. Default is True. 336 inplace (bool, default=False): 337 If True, modifies `self` in place. If False, returns a new `LocalAncestryObject` with 338 the windows filtered. Default is False. 339 340 Returns: 341 **Optional[LocalAncestryObject]:** 342 A new `LocalAncestryObject` with the specified windows filtered if `inplace=False`. 343 If `inplace=True`, modifies `self` in place and returns None. 344 """ 345 # Convert indexes to a NumPy array 346 indexes = np.atleast_1d(indexes) 347 348 # Get total number of windows 349 n_windows = self.n_windows 350 351 # Validate indexes, allowing negative indexes 352 if np.any((indexes < -n_windows) | (indexes >= n_windows)): 353 raise IndexError("One or more indexes are out of bounds.") 354 355 # Create boolean mask 356 mask = np.zeros(n_windows, dtype=bool) 357 mask[indexes] = True 358 359 # Invert mask if `include=False` 360 if not include: 361 mask = ~mask 362 363 # Filter `lai` 364 filtered_lai = self['lai'][mask, :] 365 366 # Filter `window_sizes`, `chromosomes`, `centimorgan_pos`, and `physical_pos`, checking if they are None before filtering 367 filtered_window_sizes = self['window_sizes'][mask] if self['window_sizes'] is not None else None 368 filtered_chromosomes = self['chromosomes'][mask] if self['chromosomes'] is not None else None 369 filtered_centimorgan_pos = self['centimorgan_pos'][mask, :] if self['centimorgan_pos'] is not None else None 370 filtered_physical_pos = self['physical_pos'][mask, :] if self['physical_pos'] is not None else None 371 372 # Modify the original object if `inplace=True`, otherwise create and return a copy 373 if inplace: 374 self['lai'] = filtered_lai 375 if filtered_window_sizes is not None: 376 self['window_sizes'] = filtered_window_sizes 377 if filtered_chromosomes is not None: 378 self['chromosomes'] = filtered_chromosomes 379 if filtered_centimorgan_pos is not None: 380 self['centimorgan_pos'] = filtered_centimorgan_pos 381 if filtered_physical_pos is not None: 382 self['physical_pos'] = filtered_physical_pos 383 return None 384 else: 385 laiobj = self.copy() 386 laiobj['lai'] = filtered_lai 387 if filtered_window_sizes is not None: 388 laiobj['window_sizes'] = filtered_window_sizes 389 if filtered_chromosomes is not None: 390 laiobj['chromosomes'] = filtered_chromosomes 391 if filtered_centimorgan_pos is not None: 392 laiobj['centimorgan_pos'] = filtered_centimorgan_pos 393 if filtered_physical_pos is not None: 394 laiobj['physical_pos'] = filtered_physical_pos 395 return laiobj 396 397 def filter_samples( 398 self, 399 samples: Optional[Union[str, Sequence[str], np.ndarray, None]] = None, 400 indexes: Optional[Union[int, Sequence[int], np.ndarray, None]] = None, 401 include: bool = True, 402 inplace: bool = False 403 ) -> Optional['LocalAncestryObject']: 404 """ 405 Filter samples based on specified names or indexes. 406 407 This method updates the `lai`, `haplotypes`, and `samples` attributes to include or exclude the specified 408 samples. Each sample is associated with two haplotypes, which are included or excluded together. 409 The order of the samples is preserved. 410 411 If both samples and indexes are provided, any sample matching either a name in samples or an index in 412 indexes will be included or excluded. 413 414 Negative indexes are supported and follow 415 [NumPy's indexing conventions](https://numpy.org/doc/stable/user/basics.indexing.html). 416 417 Args: 418 samples (str or array_like of str, optional): 419 Name(s) of the samples to include or exclude. Can be a single sample name or a 420 sequence of sample names. Default is None. 421 indexes (int or array_like of int, optional): 422 Index(es) of the samples to include or exclude. Can be a single index or a sequence 423 of indexes. Negative indexes are supported. Default is None. 424 include (bool, default=True): 425 If True, includes only the specified samples. If False, excludes the specified 426 samples. Default is True. 427 inplace (bool, default=False): 428 If True, modifies `self` in place. If False, returns a new `LocalAncestryObject` with the 429 samples filtered. Default is False. 430 431 Returns: 432 **Optional[LocalAncestryObject]:** 433 A new `LocalAncestryObject` with the specified samples filtered if `inplace=False`. 434 If `inplace=True`, modifies `self` in place and returns None. 435 """ 436 if samples is None and indexes is None: 437 raise UserWarning("At least one of 'samples' or 'indexes' must be provided.") 438 439 n_haplotypes = self.n_haplotypes 440 n_samples = self.n_samples 441 442 # Create mask based on sample names 443 if samples is not None: 444 samples = np.atleast_1d(samples) 445 # Extract sample names from haplotype identifiers 446 haplotype_ids = np.array(self['haplotypes']) 447 sample_names = np.array([hap.split('.')[0] for hap in haplotype_ids]) 448 # Create mask for haplotypes belonging to specified samples 449 mask_samples = np.isin(sample_names, samples) 450 else: 451 mask_samples = np.zeros(n_haplotypes, dtype=bool) 452 453 # Create mask based on sample indexes 454 if indexes is not None: 455 indexes = np.atleast_1d(indexes) 456 457 # Validate indexes, allowing negative indexes 458 out_of_bounds_indexes = indexes[(indexes < -n_samples) | (indexes >= n_samples)] 459 if out_of_bounds_indexes.size > 0: 460 raise ValueError(f"One or more sample indexes are out of bounds.") 461 462 # Adjust negative indexes 463 indexes = np.mod(indexes, n_samples) 464 465 # Get haplotype indexes for the specified sample indexes 466 haplotype_indexes = np.concatenate([2*indexes, 2*indexes+1]) 467 # Create mask for haplotypes 468 mask_indexes = np.zeros(n_haplotypes, dtype=bool) 469 mask_indexes[haplotype_indexes] = True 470 else: 471 mask_indexes = np.zeros(n_haplotypes, dtype=bool) 472 473 # Combine masks using logical OR (union of samples) 474 mask_combined = mask_samples | mask_indexes 475 476 if not include: 477 mask_combined = ~mask_combined 478 479 # Filter `lai` 480 filtered_lai = self['lai'][:, mask_combined] 481 482 # Filter `haplotypes` 483 filtered_haplotypes = np.array(self['haplotypes'])[mask_combined].tolist() 484 485 # Filter `samples`, checking if they are None before filtering 486 sample_mask = mask_combined.reshape(-1, 2).any(axis=1) 487 filtered_samples = np.array(self['samples'])[sample_mask].tolist() if self['samples'] is not None else None 488 489 if inplace: 490 self['haplotypes'] = filtered_haplotypes 491 self['samples'] = filtered_samples 492 self['lai'] = filtered_lai 493 return None 494 else: 495 laiobj = self.copy() 496 laiobj['haplotypes'] = filtered_haplotypes 497 laiobj['samples'] = filtered_samples 498 laiobj['lai'] = filtered_lai 499 return laiobj 500 501 def convert_to_snp_level( 502 self, 503 snpobject: Optional['SNPObject'] = None, 504 variants_chrom: Optional[np.ndarray] = None, 505 variants_pos: Optional[np.ndarray] = None, 506 variants_ref: Optional[np.ndarray] = None, 507 variants_alt: Optional[np.ndarray] = None, 508 variants_filter_pass: Optional[np.ndarray] = None, 509 variants_id: Optional[np.ndarray] = None, 510 variants_qual: Optional[np.ndarray] = None, 511 lai_format: str = "3D" 512 ) -> 'SNPObject': 513 """ 514 Convert `self` into a `snputils.snp.genobj.SNPObject` SNP-level Local Ancestry Information (LAI), 515 with optional support for SNP data. 516 517 If SNP positions (`variants_pos`) and/or chromosomes (`variants_chrom`) are not specified, the method generates 518 SNPs uniformly across the start and end positions of each genomic window. Otherwise, the provided SNP 519 coordinates are used to assign ancestry values based on their respective windows. 520 521 If a `SNPObject` is provided, its attributes are used unless explicitly overridden by the function arguments. 522 In that case, the SNPObject is updated with the (optional) new attributes and the computed `calldata_lai`, then returned. 523 524 Args: 525 snpobject (SNPObject, optional): 526 An existing `SNPObject` to extract SNP attributes from. 527 variants_chrom (array of shape (n_snps,), optional): 528 An array containing the chromosome for each SNP. 529 variants_pos (array of shape (n_snps,), optional): 530 An array containing the chromosomal positions for each SNP. 531 variants_ref (array of shape (n_snps,), optional): 532 An array containing the reference allele for each SNP. 533 variants_alt (array of shape (n_snps,), optional): 534 An array containing the alternate allele for each SNP. 535 variants_filter_pass (array of shape (n_snps,), optional): 536 An array indicating whether each SNP passed control checks. 537 variants_id (array of shape (n_snps,), optional): 538 An array containing unique identifiers (IDs) for each SNP. 539 variants_qual (array of shape (n_snps,), optional): 540 An array containing the Phred-scaled quality score for each SNP. 541 lai_format (str, optional): 542 Determines the shape of `calldata_lai`: 543 - `"3D"` (default): Shape `(n_snps, n_samples, 2)`. 544 - `"2D"`: Shape `(n_snps, n_samples * 2)`. 545 546 Returns: 547 **SNPObject**: 548 A `SNPObject` containing SNP-level ancestry data and updated SNP attributes. 549 """ 550 from snputils.snp.genobj.snpobj import SNPObject 551 552 assert lai_format in {"2D", "3D"}, "Invalid `lai_format`. Must be '2D' or '3D'." 553 554 # Extract attributes from SNPObject if provided 555 if snpobject is not None: 556 variants_chrom = variants_chrom or snpobject.variants_chrom 557 variants_pos = variants_pos or snpobject.variants_pos 558 variants_ref = variants_ref or snpobject.variants_ref 559 variants_alt = variants_alt or snpobject.variants_alt 560 variants_filter_pass = variants_filter_pass or snpobject.variants_filter_pass 561 variants_id = variants_id or snpobject.variants_id 562 variants_qual = variants_qual or snpobject.variants_qual 563 564 n_samples = self.n_samples 565 lai_reshaped = self.lai.reshape(self.n_windows, n_samples, 2).astype(int) if lai_format == "3D" else None 566 567 if variants_pos is None or variants_chrom is None: 568 # Generate all SNP positions and corresponding chromosome labels between window boundaries 569 variants_pos_list = [] 570 variants_chrom_list = [] 571 ancestry_list = [] 572 573 for i in range(self.n_windows): 574 start = int(self.physical_pos[i, 0]) 575 end = int(self.physical_pos[i, 1]) 576 chrom = self.chromosomes[i] 577 578 # Generate SNP positions at each base pair within the window range 579 positions_in_window = np.arange(start, end + 1) 580 if positions_in_window.size == 0: 581 continue # Skip windows that contain no valid SNP positions 582 583 n_positions = positions_in_window.size 584 variants_pos_list.append(positions_in_window) 585 variants_chrom_list.append(np.full(n_positions, chrom)) 586 587 ancestry_repeated = ( 588 np.repeat(lai_reshaped[i, np.newaxis, :, :], n_positions, axis=0) 589 if lai_format == "3D" else np.repeat(self.lai[i, np.newaxis, :], n_positions, axis=0) 590 ) 591 ancestry_list.append(ancestry_repeated) 592 593 # Store SNP positions, their corresponding chromosome labels, and their associated ancestry 594 variants_pos = np.concatenate(variants_pos_list) 595 variants_chrom = np.concatenate(variants_chrom_list) 596 calldata_lai = np.concatenate(ancestry_list) 597 else: 598 # Use the provided SNP positions and chromosomes 599 n_snps = len(variants_pos) 600 if len(variants_chrom) != n_snps: 601 raise ValueError("`variants_pos` and `variants_chrom` must have the same length.") 602 603 # Initialize an array to store the corresponding window index for each SNP 604 # Default value is -1, meaning no matching window found 605 snp_to_window_indices = np.full(n_snps, -1, dtype=int) 606 607 # Identify unique chromosome names sorted by order of appearence 608 _, idx = np.unique(variants_chrom, return_index=True) 609 unique_chroms = variants_chrom[np.sort(idx)] 610 611 # Iterate through each unique chromosome to map SNPs to windows 612 for chrom in unique_chroms: 613 # Get indices of SNPs that belong to the current chromosome 614 snp_indices = np.where(variants_chrom == chrom)[0] 615 snp_pos_chr = variants_pos[snp_indices] 616 617 # Get indices of windows that belong to the current chromosome 618 window_indices = np.where(self.chromosomes == chrom)[0] 619 if window_indices.size == 0: 620 continue # Skip if no windows exist for this chromosome 621 622 # Extract start and end positions of the windows on this chromosome 623 window_starts_chr = self.physical_pos[:, 0][window_indices] 624 window_ends_chr = self.physical_pos[:, 1][window_indices] 625 626 # Find the right-most window that a SNP would fit into (sorted order) 627 inds = np.searchsorted(window_starts_chr, snp_pos_chr, side='right') - 1 628 629 # Mask valid SNPs: ensure they are within a valid range and fall inside window boundaries 630 valid_mask = (inds >= 0) & (inds < len(window_starts_chr)) & (snp_pos_chr <= window_ends_chr[inds]) 631 632 # Assign valid SNPs to their corresponding window indices 633 snp_to_window_indices[snp_indices[valid_mask]] = window_indices[inds[valid_mask]] 634 log.debug(f"Number of SNPs within window ranges for chromosome {chrom}: {valid_mask.sum()}") 635 636 # Initialize SNP-level ancestry array with NaNs as default 637 shape = (n_snps, n_samples, 2) if lai_format == "3D" else (n_snps, n_samples * 2) 638 calldata_lai = np.full(shape, np.nan, dtype='uint8') 639 640 # Assign ancestry values to SNPs with valid window assignments 641 valid_mask = (snp_to_window_indices != -1) 642 snp_indices = np.where(valid_mask)[0] 643 snp_to_window_indices = snp_to_window_indices[snp_indices] 644 645 if lai_format == "3D": 646 calldata_lai[snp_indices] = lai_reshaped[snp_to_window_indices] 647 else: # "2D" 648 calldata_lai[snp_indices] = self.lai[snp_to_window_indices] 649 650 if snpobject is not None: 651 # If a SNPObject was provided, update its attributes with any new values and add `calldata_lai`` 652 snpobject.variants_chrom = variants_chrom 653 snpobject.variants_pos = variants_pos 654 snpobject.variants_ref = variants_ref 655 snpobject.variants_alt = variants_alt 656 snpobject.variants_filter_pass = variants_filter_pass 657 snpobject.variants_id = variants_id 658 snpobject.variants_qual = variants_qual 659 snpobject.calldata_lai = calldata_lai 660 snpobject.ancestry_map = self.ancestry_map 661 return snpobject 662 else: 663 # Otherwise, create a new SNPObject 664 return SNPObject( 665 calldata_lai=calldata_lai.view(), 666 samples=self.samples, 667 variants_ref=variants_ref.view() if isinstance(variants_ref, np.ndarray) else variants_ref, 668 variants_alt=variants_alt.view() if isinstance(variants_alt, np.ndarray) else variants_alt, 669 variants_filter_pass=variants_filter_pass.view() if isinstance(variants_filter_pass, np.ndarray) else variants_filter_pass, 670 variants_chrom=variants_chrom.view(), 671 variants_id=variants_id.view() if isinstance(variants_id, np.ndarray) else variants_id, 672 variants_pos=variants_pos.view(), 673 variants_qual=variants_qual.view() if isinstance(variants_qual, np.ndarray) else variants_qual, 674 ancestry_map=self.ancestry_map 675 ) 676 677 def _sanity_check(self) -> None: 678 """ 679 Perform sanity checks on the parsed data to ensure data integrity. 680 681 This method checks that all unique ancestries in LAI are represented 682 in the ancestry map. 683 684 Args: 685 lai (np.ndarray): The LAI data array. 686 ancestry_map (dict, optional): A dictionary mapping ancestry codes to region names, if available. 687 """ 688 # Get unique ancestries from LAI data 689 unique_ancestries = np.unique(self.lai) 690 691 if self.ancestry_map is not None: 692 # Check if all unique ancestries in the LAI are present in the ancestry map 693 for ancestry in unique_ancestries: 694 if str(ancestry) not in self.ancestry_map: 695 warnings.warn( 696 f"Ancestry '{ancestry}' found in LAI data is not represented in the ancestry map." 697 ) 698 699 def save(self, file: Union[str, Path]) -> None: 700 """ 701 Save the data stored in `self` to a specified file. 702 If the file already exists, it will be overwritten. 703 704 The format of the saved file is determined by the file extension provided in the `file` 705 argument. 706 707 **Supported formats:** 708 709 - `.msp`: Text-based MSP format. 710 - `.msp.tsv`: Text-based MSP format with TSV extension. 711 - `.pkl`: Pickle format for saving `self` in serialized form. 712 713 Args: 714 file (str or pathlib.Path): 715 Path to the file where the data will be saved. The extension of the file determines the save format. 716 Supported extensions: `.msp`, `.msp.tsv`, `.pkl`. 717 """ 718 path = Path(file) 719 suffixes = [suffix.lower() for suffix in path.suffixes] 720 721 if suffixes[-2:] == ['.msp', '.tsv'] or suffixes[-1] == '.msp': 722 self.save_msp(file) 723 elif suffixes[-1] == '.pkl': 724 self.save_pickle(file) 725 else: 726 raise ValueError( 727 f"Unsupported file extension: {suffixes[-1]}" 728 "Supported extensions are: .msp, .msp.tsv, .pkl." 729 ) 730 731 def save_msp(self, file: Union[str, Path]) -> None: 732 """ 733 Save the data stored in `self` to a `.msp` file. 734 If the file already exists, it will be overwritten. 735 736 Args: 737 file (str or pathlib.Path): 738 Path to the file where the data will be saved. It should end with `.msp` or `.msp.tsv`. 739 If the provided path does not have one of these extensions, the `.msp` extension will be appended. 740 """ 741 from snputils.ancestry.io.local.write import MSPWriter 742 743 MSPWriter(self, file).write() 744 745 def save_pickle(self, file: Union[str, Path]) -> None: 746 """ 747 Save `self` in serialized form to a `.pkl` file. 748 If the file already exists, it will be overwritten. 749 750 Args: 751 file (str or pathlib.Path): 752 Path to the file where the data will be saved. It should end with `.pkl`. 753 If the provided path does not have this extension, it will be appended. 754 """ 755 import pickle 756 with open(file, 'wb') as file: 757 pickle.dump(self, file)
A class for window-level Local Ancestry Inference (LAI) data.
21 def __init__( 22 self, 23 haplotypes: List[str], 24 lai: np.ndarray, 25 samples: Optional[List[str]] = None, 26 ancestry_map: Optional[Dict[str, str]] = None, 27 window_sizes: Optional[np.ndarray] = None, 28 centimorgan_pos: Optional[np.ndarray] = None, 29 chromosomes: Optional[np.ndarray] = None, 30 physical_pos: Optional[np.ndarray] = None 31 ) -> None: 32 """ 33 Args: 34 haplotypes (list of str of length n_haplotypes): 35 A list of unique haplotype identifiers. 36 lai (array of shape (n_windows, n_haplotypes)): 37 A 2D array containing local ancestry inference values, where each row represents a 38 genomic window, and each column corresponds to a haplotype phase for each sample. 39 samples (list of str of length n_samples, optional): 40 A list of unique sample identifiers. 41 ancestry_map (dict of str to str, optional): 42 A dictionary mapping ancestry codes to region names. 43 window_sizes (array of shape (n_windows,), optional): 44 An array specifying the number of SNPs in each genomic window. 45 centimorgan_pos (array of shape (n_windows, 2), optional): 46 A 2D array containing the start and end centimorgan positions for each window. 47 chromosomes (array of shape (n_windows,), optional): 48 An array with chromosome numbers corresponding to each genomic window. 49 physical_pos (array of shape (n_windows, 2), optional): 50 A 2D array containing the start and end physical positions for each window. 51 """ 52 if lai.ndim != 2: 53 raise ValueError("`lai` must be a 2D array with shape (n_windows, n_haplotypes).") 54 55 # Determine the number of unique ancestries and samples from the LAI array 56 n_ancestries = len(np.unique(lai)) 57 n_haplotypes = lai.shape[1] 58 n_samples = n_haplotypes // 2 59 60 super(LocalAncestryObject, self).__init__(n_samples, n_ancestries) 61 62 self.__haplotypes = haplotypes 63 self.__lai = lai 64 self.__window_sizes = window_sizes 65 self.__centimorgan_pos = centimorgan_pos 66 self.__samples = samples 67 self.__chromosomes = chromosomes 68 self.__physical_pos = physical_pos 69 self.__ancestry_map = ancestry_map 70 71 # Perform sanity check to ensure all unique ancestries in LAI data are represented in the ancestry map 72 self._sanity_check()
Arguments:
- haplotypes (list of str of length n_haplotypes): A list of unique haplotype identifiers.
- lai (array of shape (n_windows, n_haplotypes)): A 2D array containing local ancestry inference values, where each row represents a genomic window, and each column corresponds to a haplotype phase for each sample.
- samples (list of str of length n_samples, optional): A list of unique sample identifiers.
- ancestry_map (dict of str to str, optional): A dictionary mapping ancestry codes to region names.
- window_sizes (array of shape (n_windows,), optional): An array specifying the number of SNPs in each genomic window.
- centimorgan_pos (array of shape (n_windows, 2), optional): A 2D array containing the start and end centimorgan positions for each window.
- chromosomes (array of shape (n_windows,), optional): An array with chromosome numbers corresponding to each genomic window.
- physical_pos (array of shape (n_windows, 2), optional): A 2D array containing the start and end physical positions for each window.
111 @property 112 def lai(self) -> np.ndarray: 113 """ 114 Retrieve `lai`. 115 116 Returns: 117 **array of shape (n_windows, n_haplotypes):** 118 A 2D array containing local ancestry inference values, where each row represents a 119 genomic window, and each column corresponds to a haplotype phase for each sample. 120 """ 121 return self.__lai
Retrieve lai
.
Returns:
array of shape (n_windows, n_haplotypes): A 2D array containing local ancestry inference values, where each row represents a genomic window, and each column corresponds to a haplotype phase for each sample.
130 @property 131 def samples(self) -> Optional[List[str]]: 132 """ 133 Retrieve `samples`. 134 135 Returns: 136 **list of str:** A list of unique sample identifiers. 137 """ 138 if self.__samples is not None: 139 return self.__samples 140 elif self.__haplotypes is not None: 141 return [hap.split('.')[0] for hap in self.__haplotypes][::2] 142 else: 143 return None
152 @property 153 def ancestry_map(self) -> Optional[Dict[str, str]]: 154 """ 155 Retrieve `ancestry_map`. 156 157 Returns: 158 **dict of str to str:** A dictionary mapping ancestry codes to region names. 159 """ 160 return self.__ancestry_map
Retrieve ancestry_map
.
Returns:
dict of str to str: A dictionary mapping ancestry codes to region names.
169 @property 170 def window_sizes(self) -> Optional[np.ndarray]: 171 """ 172 Retrieve `window_sizes`. 173 174 Returns: 175 **array of shape (n_windows,):** 176 An array specifying the number of SNPs in each genomic window. 177 """ 178 return self.__window_sizes
Retrieve window_sizes
.
Returns:
array of shape (n_windows,): An array specifying the number of SNPs in each genomic window.
187 @property 188 def centimorgan_pos(self) -> Optional[np.ndarray]: 189 """ 190 Retrieve `centimorgan_pos`. 191 192 Returns: 193 **array of shape (n_windows, 2):** 194 A 2D array containing the start and end centimorgan positions for each window. 195 """ 196 return self.__centimorgan_pos
Retrieve centimorgan_pos
.
Returns:
array of shape (n_windows, 2): A 2D array containing the start and end centimorgan positions for each window.
205 @property 206 def chromosomes(self) -> Optional[np.ndarray]: 207 """ 208 Retrieve `chromosomes`. 209 210 Returns: 211 **array of shape (n_windows,):** 212 An array with chromosome numbers corresponding to each genomic window. 213 """ 214 return self.__chromosomes
Retrieve chromosomes
.
Returns:
array of shape (n_windows,): An array with chromosome numbers corresponding to each genomic window.
223 @property 224 def physical_pos(self) -> Optional[np.ndarray]: 225 """ 226 Retrieve `physical_pos`. 227 228 Returns: 229 **array of shape (n_windows, 2):** 230 A 2D array containing the start and end physical positions for each window. 231 """ 232 return self.__physical_pos
Retrieve physical_pos
.
Returns:
array of shape (n_windows, 2): A 2D array containing the start and end physical positions for each window.
241 @property 242 def n_samples(self) -> int: 243 """ 244 Retrieve `n_samples`. 245 246 Returns: 247 **int:** 248 The total number of samples. 249 """ 250 if self.__samples is not None: 251 return len(self.__samples) 252 elif self.__haplotypes is not None: 253 # Divide by 2 because each sample has two associated haplotypes 254 return len(self.__haplotypes) // 2 255 else: 256 #Â Divide by 2 because columns represent haplotypes 257 return self.__lai.shape[1] // 2
292 def copy(self) -> 'LocalAncestryObject': 293 """ 294 Create and return a copy of `self`. 295 296 Returns: 297 **LocalAncestryObject:** 298 A new instance of the current object. 299 """ 300 return copy.copy(self)
Create and return a copy of self
.
Returns:
LocalAncestryObject: A new instance of the current object.
302 def keys(self) -> List[str]: 303 """ 304 Retrieve a list of public attribute names for `self`. 305 306 Returns: 307 **list of str:** 308 A list of attribute names, with internal name-mangling removed, 309 for easier reference to public attributes in the instance. 310 """ 311 return [attr.replace('_LocalAncestryObject__', '').replace('_AncestryObject__', '') for attr in vars(self)]
Retrieve a list of public attribute names for self
.
Returns:
list of str: A list of attribute names, with internal name-mangling removed, for easier reference to public attributes in the instance.
313 def filter_windows( 314 self, 315 indexes: Union[int, Sequence[int], np.ndarray], 316 include: bool = True, 317 inplace: bool = False 318 ) -> Optional['LocalAncestryObject']: 319 """ 320 Filter genomic windows based on specified indexes. 321 322 This method updates the `lai` attribute to include or exclude the specified genomic windows. 323 Attributes such as `window_sizes`, `centimorgan_pos`, `chromosomes`, and `physical_pos` will also be 324 updated accordingly if they are not None. The order of genomic windows is preserved. 325 326 Negative indexes are supported and follow 327 [NumPy's indexing conventions](https://numpy.org/doc/stable/user/basics.indexing.html). 328 329 Args: 330 indexes (int or array-like of int): 331 Index(es) of the windows to include or exclude. Can be a single integer or a 332 sequence of integers. Negative indexes are supported. 333 include (bool, default=True): 334 If True, includes only the specified windows. If False, excludes the specified 335 windows. Default is True. 336 inplace (bool, default=False): 337 If True, modifies `self` in place. If False, returns a new `LocalAncestryObject` with 338 the windows filtered. Default is False. 339 340 Returns: 341 **Optional[LocalAncestryObject]:** 342 A new `LocalAncestryObject` with the specified windows filtered if `inplace=False`. 343 If `inplace=True`, modifies `self` in place and returns None. 344 """ 345 # Convert indexes to a NumPy array 346 indexes = np.atleast_1d(indexes) 347 348 # Get total number of windows 349 n_windows = self.n_windows 350 351 # Validate indexes, allowing negative indexes 352 if np.any((indexes < -n_windows) | (indexes >= n_windows)): 353 raise IndexError("One or more indexes are out of bounds.") 354 355 # Create boolean mask 356 mask = np.zeros(n_windows, dtype=bool) 357 mask[indexes] = True 358 359 # Invert mask if `include=False` 360 if not include: 361 mask = ~mask 362 363 # Filter `lai` 364 filtered_lai = self['lai'][mask, :] 365 366 # Filter `window_sizes`, `chromosomes`, `centimorgan_pos`, and `physical_pos`, checking if they are None before filtering 367 filtered_window_sizes = self['window_sizes'][mask] if self['window_sizes'] is not None else None 368 filtered_chromosomes = self['chromosomes'][mask] if self['chromosomes'] is not None else None 369 filtered_centimorgan_pos = self['centimorgan_pos'][mask, :] if self['centimorgan_pos'] is not None else None 370 filtered_physical_pos = self['physical_pos'][mask, :] if self['physical_pos'] is not None else None 371 372 # Modify the original object if `inplace=True`, otherwise create and return a copy 373 if inplace: 374 self['lai'] = filtered_lai 375 if filtered_window_sizes is not None: 376 self['window_sizes'] = filtered_window_sizes 377 if filtered_chromosomes is not None: 378 self['chromosomes'] = filtered_chromosomes 379 if filtered_centimorgan_pos is not None: 380 self['centimorgan_pos'] = filtered_centimorgan_pos 381 if filtered_physical_pos is not None: 382 self['physical_pos'] = filtered_physical_pos 383 return None 384 else: 385 laiobj = self.copy() 386 laiobj['lai'] = filtered_lai 387 if filtered_window_sizes is not None: 388 laiobj['window_sizes'] = filtered_window_sizes 389 if filtered_chromosomes is not None: 390 laiobj['chromosomes'] = filtered_chromosomes 391 if filtered_centimorgan_pos is not None: 392 laiobj['centimorgan_pos'] = filtered_centimorgan_pos 393 if filtered_physical_pos is not None: 394 laiobj['physical_pos'] = filtered_physical_pos 395 return laiobj
Filter genomic windows based on specified indexes.
This method updates the lai
attribute to include or exclude the specified genomic windows.
Attributes such as window_sizes
, centimorgan_pos
, chromosomes
, and physical_pos
will also be
updated accordingly if they are not None. The order of genomic windows is preserved.
Negative indexes are supported and follow NumPy's indexing conventions.
Arguments:
- indexes (int or array-like of int): Index(es) of the windows to include or exclude. Can be a single integer or a sequence of integers. Negative indexes are supported.
- include (bool, default=True): If True, includes only the specified windows. If False, excludes the specified windows. Default is True.
- inplace (bool, default=False): If True, modifies
self
in place. If False, returns a newLocalAncestryObject
with the windows filtered. Default is False.
Returns:
Optional[LocalAncestryObject]: A new
LocalAncestryObject
with the specified windows filtered ifinplace=False
. Ifinplace=True
, modifiesself
in place and returns None.
397 def filter_samples( 398 self, 399 samples: Optional[Union[str, Sequence[str], np.ndarray, None]] = None, 400 indexes: Optional[Union[int, Sequence[int], np.ndarray, None]] = None, 401 include: bool = True, 402 inplace: bool = False 403 ) -> Optional['LocalAncestryObject']: 404 """ 405 Filter samples based on specified names or indexes. 406 407 This method updates the `lai`, `haplotypes`, and `samples` attributes to include or exclude the specified 408 samples. Each sample is associated with two haplotypes, which are included or excluded together. 409 The order of the samples is preserved. 410 411 If both samples and indexes are provided, any sample matching either a name in samples or an index in 412 indexes will be included or excluded. 413 414 Negative indexes are supported and follow 415 [NumPy's indexing conventions](https://numpy.org/doc/stable/user/basics.indexing.html). 416 417 Args: 418 samples (str or array_like of str, optional): 419 Name(s) of the samples to include or exclude. Can be a single sample name or a 420 sequence of sample names. Default is None. 421 indexes (int or array_like of int, optional): 422 Index(es) of the samples to include or exclude. Can be a single index or a sequence 423 of indexes. Negative indexes are supported. Default is None. 424 include (bool, default=True): 425 If True, includes only the specified samples. If False, excludes the specified 426 samples. Default is True. 427 inplace (bool, default=False): 428 If True, modifies `self` in place. If False, returns a new `LocalAncestryObject` with the 429 samples filtered. Default is False. 430 431 Returns: 432 **Optional[LocalAncestryObject]:** 433 A new `LocalAncestryObject` with the specified samples filtered if `inplace=False`. 434 If `inplace=True`, modifies `self` in place and returns None. 435 """ 436 if samples is None and indexes is None: 437 raise UserWarning("At least one of 'samples' or 'indexes' must be provided.") 438 439 n_haplotypes = self.n_haplotypes 440 n_samples = self.n_samples 441 442 # Create mask based on sample names 443 if samples is not None: 444 samples = np.atleast_1d(samples) 445 # Extract sample names from haplotype identifiers 446 haplotype_ids = np.array(self['haplotypes']) 447 sample_names = np.array([hap.split('.')[0] for hap in haplotype_ids]) 448 # Create mask for haplotypes belonging to specified samples 449 mask_samples = np.isin(sample_names, samples) 450 else: 451 mask_samples = np.zeros(n_haplotypes, dtype=bool) 452 453 # Create mask based on sample indexes 454 if indexes is not None: 455 indexes = np.atleast_1d(indexes) 456 457 # Validate indexes, allowing negative indexes 458 out_of_bounds_indexes = indexes[(indexes < -n_samples) | (indexes >= n_samples)] 459 if out_of_bounds_indexes.size > 0: 460 raise ValueError(f"One or more sample indexes are out of bounds.") 461 462 # Adjust negative indexes 463 indexes = np.mod(indexes, n_samples) 464 465 # Get haplotype indexes for the specified sample indexes 466 haplotype_indexes = np.concatenate([2*indexes, 2*indexes+1]) 467 # Create mask for haplotypes 468 mask_indexes = np.zeros(n_haplotypes, dtype=bool) 469 mask_indexes[haplotype_indexes] = True 470 else: 471 mask_indexes = np.zeros(n_haplotypes, dtype=bool) 472 473 # Combine masks using logical OR (union of samples) 474 mask_combined = mask_samples | mask_indexes 475 476 if not include: 477 mask_combined = ~mask_combined 478 479 # Filter `lai` 480 filtered_lai = self['lai'][:, mask_combined] 481 482 # Filter `haplotypes` 483 filtered_haplotypes = np.array(self['haplotypes'])[mask_combined].tolist() 484 485 # Filter `samples`, checking if they are None before filtering 486 sample_mask = mask_combined.reshape(-1, 2).any(axis=1) 487 filtered_samples = np.array(self['samples'])[sample_mask].tolist() if self['samples'] is not None else None 488 489 if inplace: 490 self['haplotypes'] = filtered_haplotypes 491 self['samples'] = filtered_samples 492 self['lai'] = filtered_lai 493 return None 494 else: 495 laiobj = self.copy() 496 laiobj['haplotypes'] = filtered_haplotypes 497 laiobj['samples'] = filtered_samples 498 laiobj['lai'] = filtered_lai 499 return laiobj
Filter samples based on specified names or indexes.
This method updates the lai
, haplotypes
, and samples
attributes to include or exclude the specified
samples. Each sample is associated with two haplotypes, which are included or excluded together.
The order of the samples is preserved.
If both samples and indexes are provided, any sample matching either a name in samples or an index in indexes will be included or excluded.
Negative indexes are supported and follow NumPy's indexing conventions.
Arguments:
- samples (str or array_like of str, optional): Name(s) of the samples to include or exclude. Can be a single sample name or a sequence of sample names. Default is None.
- indexes (int or array_like of int, optional): Index(es) of the samples to include or exclude. Can be a single index or a sequence of indexes. Negative indexes are supported. Default is None.
- include (bool, default=True): If True, includes only the specified samples. If False, excludes the specified samples. Default is True.
- inplace (bool, default=False): If True, modifies
self
in place. If False, returns a newLocalAncestryObject
with the samples filtered. Default is False.
Returns:
Optional[LocalAncestryObject]: A new
LocalAncestryObject
with the specified samples filtered ifinplace=False
. Ifinplace=True
, modifiesself
in place and returns None.
501 def convert_to_snp_level( 502 self, 503 snpobject: Optional['SNPObject'] = None, 504 variants_chrom: Optional[np.ndarray] = None, 505 variants_pos: Optional[np.ndarray] = None, 506 variants_ref: Optional[np.ndarray] = None, 507 variants_alt: Optional[np.ndarray] = None, 508 variants_filter_pass: Optional[np.ndarray] = None, 509 variants_id: Optional[np.ndarray] = None, 510 variants_qual: Optional[np.ndarray] = None, 511 lai_format: str = "3D" 512 ) -> 'SNPObject': 513 """ 514 Convert `self` into a `snputils.snp.genobj.SNPObject` SNP-level Local Ancestry Information (LAI), 515 with optional support for SNP data. 516 517 If SNP positions (`variants_pos`) and/or chromosomes (`variants_chrom`) are not specified, the method generates 518 SNPs uniformly across the start and end positions of each genomic window. Otherwise, the provided SNP 519 coordinates are used to assign ancestry values based on their respective windows. 520 521 If a `SNPObject` is provided, its attributes are used unless explicitly overridden by the function arguments. 522 In that case, the SNPObject is updated with the (optional) new attributes and the computed `calldata_lai`, then returned. 523 524 Args: 525 snpobject (SNPObject, optional): 526 An existing `SNPObject` to extract SNP attributes from. 527 variants_chrom (array of shape (n_snps,), optional): 528 An array containing the chromosome for each SNP. 529 variants_pos (array of shape (n_snps,), optional): 530 An array containing the chromosomal positions for each SNP. 531 variants_ref (array of shape (n_snps,), optional): 532 An array containing the reference allele for each SNP. 533 variants_alt (array of shape (n_snps,), optional): 534 An array containing the alternate allele for each SNP. 535 variants_filter_pass (array of shape (n_snps,), optional): 536 An array indicating whether each SNP passed control checks. 537 variants_id (array of shape (n_snps,), optional): 538 An array containing unique identifiers (IDs) for each SNP. 539 variants_qual (array of shape (n_snps,), optional): 540 An array containing the Phred-scaled quality score for each SNP. 541 lai_format (str, optional): 542 Determines the shape of `calldata_lai`: 543 - `"3D"` (default): Shape `(n_snps, n_samples, 2)`. 544 - `"2D"`: Shape `(n_snps, n_samples * 2)`. 545 546 Returns: 547 **SNPObject**: 548 A `SNPObject` containing SNP-level ancestry data and updated SNP attributes. 549 """ 550 from snputils.snp.genobj.snpobj import SNPObject 551 552 assert lai_format in {"2D", "3D"}, "Invalid `lai_format`. Must be '2D' or '3D'." 553 554 # Extract attributes from SNPObject if provided 555 if snpobject is not None: 556 variants_chrom = variants_chrom or snpobject.variants_chrom 557 variants_pos = variants_pos or snpobject.variants_pos 558 variants_ref = variants_ref or snpobject.variants_ref 559 variants_alt = variants_alt or snpobject.variants_alt 560 variants_filter_pass = variants_filter_pass or snpobject.variants_filter_pass 561 variants_id = variants_id or snpobject.variants_id 562 variants_qual = variants_qual or snpobject.variants_qual 563 564 n_samples = self.n_samples 565 lai_reshaped = self.lai.reshape(self.n_windows, n_samples, 2).astype(int) if lai_format == "3D" else None 566 567 if variants_pos is None or variants_chrom is None: 568 # Generate all SNP positions and corresponding chromosome labels between window boundaries 569 variants_pos_list = [] 570 variants_chrom_list = [] 571 ancestry_list = [] 572 573 for i in range(self.n_windows): 574 start = int(self.physical_pos[i, 0]) 575 end = int(self.physical_pos[i, 1]) 576 chrom = self.chromosomes[i] 577 578 # Generate SNP positions at each base pair within the window range 579 positions_in_window = np.arange(start, end + 1) 580 if positions_in_window.size == 0: 581 continue # Skip windows that contain no valid SNP positions 582 583 n_positions = positions_in_window.size 584 variants_pos_list.append(positions_in_window) 585 variants_chrom_list.append(np.full(n_positions, chrom)) 586 587 ancestry_repeated = ( 588 np.repeat(lai_reshaped[i, np.newaxis, :, :], n_positions, axis=0) 589 if lai_format == "3D" else np.repeat(self.lai[i, np.newaxis, :], n_positions, axis=0) 590 ) 591 ancestry_list.append(ancestry_repeated) 592 593 # Store SNP positions, their corresponding chromosome labels, and their associated ancestry 594 variants_pos = np.concatenate(variants_pos_list) 595 variants_chrom = np.concatenate(variants_chrom_list) 596 calldata_lai = np.concatenate(ancestry_list) 597 else: 598 # Use the provided SNP positions and chromosomes 599 n_snps = len(variants_pos) 600 if len(variants_chrom) != n_snps: 601 raise ValueError("`variants_pos` and `variants_chrom` must have the same length.") 602 603 # Initialize an array to store the corresponding window index for each SNP 604 # Default value is -1, meaning no matching window found 605 snp_to_window_indices = np.full(n_snps, -1, dtype=int) 606 607 # Identify unique chromosome names sorted by order of appearence 608 _, idx = np.unique(variants_chrom, return_index=True) 609 unique_chroms = variants_chrom[np.sort(idx)] 610 611 # Iterate through each unique chromosome to map SNPs to windows 612 for chrom in unique_chroms: 613 # Get indices of SNPs that belong to the current chromosome 614 snp_indices = np.where(variants_chrom == chrom)[0] 615 snp_pos_chr = variants_pos[snp_indices] 616 617 # Get indices of windows that belong to the current chromosome 618 window_indices = np.where(self.chromosomes == chrom)[0] 619 if window_indices.size == 0: 620 continue # Skip if no windows exist for this chromosome 621 622 # Extract start and end positions of the windows on this chromosome 623 window_starts_chr = self.physical_pos[:, 0][window_indices] 624 window_ends_chr = self.physical_pos[:, 1][window_indices] 625 626 # Find the right-most window that a SNP would fit into (sorted order) 627 inds = np.searchsorted(window_starts_chr, snp_pos_chr, side='right') - 1 628 629 # Mask valid SNPs: ensure they are within a valid range and fall inside window boundaries 630 valid_mask = (inds >= 0) & (inds < len(window_starts_chr)) & (snp_pos_chr <= window_ends_chr[inds]) 631 632 # Assign valid SNPs to their corresponding window indices 633 snp_to_window_indices[snp_indices[valid_mask]] = window_indices[inds[valid_mask]] 634 log.debug(f"Number of SNPs within window ranges for chromosome {chrom}: {valid_mask.sum()}") 635 636 # Initialize SNP-level ancestry array with NaNs as default 637 shape = (n_snps, n_samples, 2) if lai_format == "3D" else (n_snps, n_samples * 2) 638 calldata_lai = np.full(shape, np.nan, dtype='uint8') 639 640 # Assign ancestry values to SNPs with valid window assignments 641 valid_mask = (snp_to_window_indices != -1) 642 snp_indices = np.where(valid_mask)[0] 643 snp_to_window_indices = snp_to_window_indices[snp_indices] 644 645 if lai_format == "3D": 646 calldata_lai[snp_indices] = lai_reshaped[snp_to_window_indices] 647 else: # "2D" 648 calldata_lai[snp_indices] = self.lai[snp_to_window_indices] 649 650 if snpobject is not None: 651 # If a SNPObject was provided, update its attributes with any new values and add `calldata_lai`` 652 snpobject.variants_chrom = variants_chrom 653 snpobject.variants_pos = variants_pos 654 snpobject.variants_ref = variants_ref 655 snpobject.variants_alt = variants_alt 656 snpobject.variants_filter_pass = variants_filter_pass 657 snpobject.variants_id = variants_id 658 snpobject.variants_qual = variants_qual 659 snpobject.calldata_lai = calldata_lai 660 snpobject.ancestry_map = self.ancestry_map 661 return snpobject 662 else: 663 # Otherwise, create a new SNPObject 664 return SNPObject( 665 calldata_lai=calldata_lai.view(), 666 samples=self.samples, 667 variants_ref=variants_ref.view() if isinstance(variants_ref, np.ndarray) else variants_ref, 668 variants_alt=variants_alt.view() if isinstance(variants_alt, np.ndarray) else variants_alt, 669 variants_filter_pass=variants_filter_pass.view() if isinstance(variants_filter_pass, np.ndarray) else variants_filter_pass, 670 variants_chrom=variants_chrom.view(), 671 variants_id=variants_id.view() if isinstance(variants_id, np.ndarray) else variants_id, 672 variants_pos=variants_pos.view(), 673 variants_qual=variants_qual.view() if isinstance(variants_qual, np.ndarray) else variants_qual, 674 ancestry_map=self.ancestry_map 675 )
Convert self
into a snputils.snp.genobj.SNPObject
SNP-level Local Ancestry Information (LAI),
with optional support for SNP data.
If SNP positions (variants_pos
) and/or chromosomes (variants_chrom
) are not specified, the method generates
SNPs uniformly across the start and end positions of each genomic window. Otherwise, the provided SNP
coordinates are used to assign ancestry values based on their respective windows.
If a SNPObject
is provided, its attributes are used unless explicitly overridden by the function arguments.
In that case, the SNPObject is updated with the (optional) new attributes and the computed calldata_lai
, then returned.
Arguments:
- snpobject (SNPObject, optional): An existing
SNPObject
to extract SNP attributes from. - variants_chrom (array of shape (n_snps,), optional): An array containing the chromosome for each SNP.
- variants_pos (array of shape (n_snps,), optional): An array containing the chromosomal positions for each SNP.
- variants_ref (array of shape (n_snps,), optional): An array containing the reference allele for each SNP.
- variants_alt (array of shape (n_snps,), optional): An array containing the alternate allele for each SNP.
- variants_filter_pass (array of shape (n_snps,), optional): An array indicating whether each SNP passed control checks.
- variants_id (array of shape (n_snps,), optional): An array containing unique identifiers (IDs) for each SNP.
- variants_qual (array of shape (n_snps,), optional): An array containing the Phred-scaled quality score for each SNP.
- lai_format (str, optional): Determines the shape of
calldata_lai
:"3D"
(default): Shape(n_snps, n_samples, 2)
."2D"
: Shape(n_snps, n_samples * 2)
.
Returns:
SNPObject: A
SNPObject
containing SNP-level ancestry data and updated SNP attributes.
699 def save(self, file: Union[str, Path]) -> None: 700 """ 701 Save the data stored in `self` to a specified file. 702 If the file already exists, it will be overwritten. 703 704 The format of the saved file is determined by the file extension provided in the `file` 705 argument. 706 707 **Supported formats:** 708 709 - `.msp`: Text-based MSP format. 710 - `.msp.tsv`: Text-based MSP format with TSV extension. 711 - `.pkl`: Pickle format for saving `self` in serialized form. 712 713 Args: 714 file (str or pathlib.Path): 715 Path to the file where the data will be saved. The extension of the file determines the save format. 716 Supported extensions: `.msp`, `.msp.tsv`, `.pkl`. 717 """ 718 path = Path(file) 719 suffixes = [suffix.lower() for suffix in path.suffixes] 720 721 if suffixes[-2:] == ['.msp', '.tsv'] or suffixes[-1] == '.msp': 722 self.save_msp(file) 723 elif suffixes[-1] == '.pkl': 724 self.save_pickle(file) 725 else: 726 raise ValueError( 727 f"Unsupported file extension: {suffixes[-1]}" 728 "Supported extensions are: .msp, .msp.tsv, .pkl." 729 )
Save the data stored in self
to a specified file.
If the file already exists, it will be overwritten.
The format of the saved file is determined by the file extension provided in the file
argument.
Supported formats:
.msp
: Text-based MSP format..msp.tsv
: Text-based MSP format with TSV extension..pkl
: Pickle format for savingself
in serialized form.
Arguments:
- file (str or pathlib.Path): Path to the file where the data will be saved. The extension of the file determines the save format.
Supported extensions:
.msp
,.msp.tsv
,.pkl
.
731 def save_msp(self, file: Union[str, Path]) -> None: 732 """ 733 Save the data stored in `self` to a `.msp` file. 734 If the file already exists, it will be overwritten. 735 736 Args: 737 file (str or pathlib.Path): 738 Path to the file where the data will be saved. It should end with `.msp` or `.msp.tsv`. 739 If the provided path does not have one of these extensions, the `.msp` extension will be appended. 740 """ 741 from snputils.ancestry.io.local.write import MSPWriter 742 743 MSPWriter(self, file).write()
Save the data stored in self
to a .msp
file.
If the file already exists, it will be overwritten.
Arguments:
- file (str or pathlib.Path): Path to the file where the data will be saved. It should end with
.msp
or.msp.tsv
. If the provided path does not have one of these extensions, the.msp
extension will be appended.
745 def save_pickle(self, file: Union[str, Path]) -> None: 746 """ 747 Save `self` in serialized form to a `.pkl` file. 748 If the file already exists, it will be overwritten. 749 750 Args: 751 file (str or pathlib.Path): 752 Path to the file where the data will be saved. It should end with `.pkl`. 753 If the provided path does not have this extension, it will be appended. 754 """ 755 import pickle 756 with open(file, 'wb') as file: 757 pickle.dump(self, file)
Save self
in serialized form to a .pkl
file.
If the file already exists, it will be overwritten.
Arguments:
- file (str or pathlib.Path): Path to the file where the data will be saved. It should end with
.pkl
. If the provided path does not have this extension, it will be appended.