import gc
import logging
import warnings
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterator, List, Optional, Union
import numpy as np
import pandas as pd
import re
from .base import LAIBaseReader
from snputils.ancestry.genobj.local import LocalAncestryObject
log = logging.getLogger(__name__)
@dataclass
class MSPMetadata:
header: List[str]
comment: Optional[str]
first_lai_col_indx: int
haplotypes: List[str]
samples: List[str]
ancestry_map: Optional[Dict[str, str]]
has_physical_pos: bool
has_centimorgan_pos: bool
has_window_sizes: bool
[docs]
class MSPReader(LAIBaseReader):
"""
A reader class for parsing Local Ancestry Inference (LAI) data from an `.msp` or `msp.tsv` file
and constructing a `snputils.ancestry.genobj.LocalAncestryObject`.
"""
def __init__(self, file: Union[str, Path]) -> None:
"""
Args:
file (str or pathlib.Path):
Path to the file to be read. It should end with `.msp` or `.msp.tsv`.
"""
self.__file = Path(file)
@property
def file(self) -> Path:
"""
Retrieve `file`.
Returns:
pathlib.Path:
Path to the file to be read. It should end with `.msp` or `.msp.tsv`.
"""
return self.__file
def _get_samples(self, msp_df: pd.DataFrame, first_lai_col_indx: int) -> List[str]:
"""
Extract unique sample identifiers from the pandas DataFrame.
Args:
msp_df (pd.DataFrame):
The DataFrame representing the `.msp` data, including LAI columns.
first_lai_col_indx (int):
Index of the first column containing LAI data.
Returns:
**list:** List of unique sample identifiers.
"""
# Get all columns starting from the first LAI data column
query_samples_dub = msp_df.columns[first_lai_col_indx:]
# Select only one of the maternal/paternal samples by taking every second sample
single_ind_idx = np.arange(0, len(query_samples_dub), 2)
query_samples_sing = query_samples_dub[single_ind_idx]
# Remove the suffix from sample names to get clean identifiers
query_samples = [qs[:-2] for qs in query_samples_sing]
return query_samples
def _get_samples_from_haplotypes(self, haplotypes: List[str]) -> List[str]:
query_samples_dub = np.asarray(haplotypes, dtype=object)
single_ind_idx = np.arange(0, len(query_samples_dub), 2)
query_samples_sing = query_samples_dub[single_ind_idx]
return [str(qs)[:-2] for qs in query_samples_sing]
def _parse_header_and_comment(self) -> tuple[Optional[str], List[str]]:
with open(self.file) as f:
first_line = f.readline()
second_line = f.readline()
first_line_ = [h.strip() for h in first_line.split("\t")]
second_line_ = [h.strip() for h in second_line.split("\t")]
if "#chm" in first_line_:
return None, first_line_
if "#chm" in second_line_:
return first_line, second_line_
raise ValueError(
f"Header not found. Expected '#chm' in the first two lines. "
f"First line: {first_line.strip()} | Second line: {second_line.strip()}"
)
def _get_first_lai_col_indx(self, header: List[str]) -> int:
column_counter = 1
if "spos" in header and "epos" in header:
column_counter += 2
if "sgpos" in header and "egpos" in header:
column_counter += 2
if "n snps" in header:
column_counter += 1
return column_counter
def read_metadata(self) -> MSPMetadata:
comment, header = self._parse_header_and_comment()
if len(header) != len(set(header)):
raise ValueError("Duplicate columns detected in the header.")
first_lai_col_indx = self._get_first_lai_col_indx(header)
haplotypes = header[first_lai_col_indx:]
samples = self._get_samples_from_haplotypes(haplotypes)
ancestry_map = self._get_ancestry_map_from_comment(comment) if comment is not None else None
return MSPMetadata(
header=header,
comment=comment,
first_lai_col_indx=first_lai_col_indx,
haplotypes=haplotypes,
samples=samples,
ancestry_map=ancestry_map,
has_physical_pos=("spos" in header and "epos" in header),
has_centimorgan_pos=("sgpos" in header and "egpos" in header),
has_window_sizes=("n snps" in header),
)
def iter_windows(
self,
chunk_size: int = 1024,
sample_indices: Optional[np.ndarray] = None,
) -> Iterator[Dict[str, np.ndarray]]:
metadata = self.read_metadata()
if chunk_size < 1:
raise ValueError("chunk_size must be >= 1.")
header = metadata.header
first_lai_col_indx = metadata.first_lai_col_indx
column_index = {name: i for i, name in enumerate(header)}
chrom_col_idx = column_index["#chm"]
spos_col_idx: Optional[int] = None
epos_col_idx: Optional[int] = None
if metadata.has_physical_pos:
spos_col_idx = column_index["spos"]
epos_col_idx = column_index["epos"]
if sample_indices is None:
hap_col_indices = list(range(first_lai_col_indx, len(header)))
else:
sample_indices = np.asarray(sample_indices, dtype=np.int64)
if sample_indices.size == 0:
raise ValueError("sample_indices cannot be empty.")
if np.any(sample_indices < 0) or np.any(sample_indices >= len(metadata.samples)):
raise ValueError("sample_indices contain out-of-bounds sample indexes.")
hap_indices = np.empty(sample_indices.size * 2, dtype=np.int64)
hap_indices[0::2] = 2 * sample_indices
hap_indices[1::2] = 2 * sample_indices + 1
hap_col_indices = (first_lai_col_indx + hap_indices).astype(np.int64).tolist()
n_selected_haps = len(hap_col_indices)
n_total_haps = len(metadata.haplotypes)
all_haps_selected = (
n_selected_haps == n_total_haps
and n_selected_haps > 0
and hap_col_indices[0] == first_lai_col_indx
and hap_col_indices[-1] == (len(header) - 1)
)
# Pre-compute relative indices for the sample-subset path so the
# inner loop can use np.fromstring (C-level) + numpy fancy indexing
# instead of a Python for-loop over potentially millions of columns.
if not all_haps_selected:
_relative_hap_idx = np.array(hap_col_indices, dtype=np.intp) - first_lai_col_indx
else:
_relative_hap_idx = None
row_in_chunk = 0
window_start = 0
chromosomes_chunk = np.empty(int(chunk_size), dtype=object)
lai_chunk = np.empty((int(chunk_size), n_selected_haps), dtype=np.uint8)
physical_pos_chunk = (
np.empty((int(chunk_size), 2), dtype=np.int64)
if metadata.has_physical_pos
else None
)
with open(self.file, "r", encoding="utf-8") as handle:
for line_no, raw_line in enumerate(handle, start=1):
if not raw_line:
continue
if raw_line.startswith("#"):
continue
line = raw_line.rstrip("\n")
if not line:
continue
# Both paths split only at the metadata/haplotype boundary,
# then use np.fromstring (C parser) for the haplotype tail.
fields = line.split("\t", first_lai_col_indx)
if len(fields) != (first_lai_col_indx + 1):
raise ValueError(
f"Malformed MSP row at line {line_no}: expected {first_lai_col_indx + 1} "
f"prefix segments when parsing haplotypes."
)
chromosomes_chunk[row_in_chunk] = fields[chrom_col_idx]
if physical_pos_chunk is not None and spos_col_idx is not None and epos_col_idx is not None:
physical_pos_chunk[row_in_chunk, 0] = int(fields[spos_col_idx])
physical_pos_chunk[row_in_chunk, 1] = int(fields[epos_col_idx])
lai_row = np.fromstring(fields[first_lai_col_indx], sep="\t", dtype=np.uint8)
if all_haps_selected:
if lai_row.size != n_selected_haps:
raise ValueError(
f"Malformed MSP haplotype row at line {line_no}: expected "
f"{n_selected_haps} haplotype values, got {lai_row.size}."
)
lai_chunk[row_in_chunk, :] = lai_row
else:
if lai_row.size < n_total_haps:
raise ValueError(
f"Malformed MSP haplotype row at line {line_no}: expected at least "
f"{n_total_haps} haplotype values, got {lai_row.size}."
)
lai_chunk[row_in_chunk, :] = lai_row[_relative_hap_idx]
row_in_chunk += 1
if row_in_chunk == chunk_size:
window_indexes = np.arange(window_start, window_start + row_in_chunk, dtype=np.int64)
yield {
"window_indexes": window_indexes,
"chromosomes": chromosomes_chunk,
"physical_pos": physical_pos_chunk,
"lai": lai_chunk,
}
window_start += row_in_chunk
row_in_chunk = 0
chromosomes_chunk = np.empty(int(chunk_size), dtype=object)
lai_chunk = np.empty((int(chunk_size), n_selected_haps), dtype=np.uint8)
if metadata.has_physical_pos:
physical_pos_chunk = np.empty((int(chunk_size), 2), dtype=np.int64)
else:
physical_pos_chunk = None
if row_in_chunk > 0:
window_indexes = np.arange(window_start, window_start + row_in_chunk, dtype=np.int64)
yield {
"window_indexes": window_indexes,
"chromosomes": chromosomes_chunk[:row_in_chunk],
"physical_pos": (
physical_pos_chunk[:row_in_chunk]
if physical_pos_chunk is not None
else None
),
"lai": lai_chunk[:row_in_chunk],
}
def _get_ancestry_map_from_comment(self, comment: str) -> Dict[str, str]:
"""
Construct an ancestry map from the comment line of the `.msp` file.
This method parses the comment string to create a mapping of ancestry numerical identifiers
to their corresponding ancestry names (e.g., '0': 'African').
Args:
comment (str):
The comment line containing ancestry mapping information.
Returns:
dict: A dictionary mapping ancestry codes (as strings) to ancestry names.
"""
comment = comment.strip()
# Remove everything before the colon, if present
if ':' in comment:
comment = comment.split(':', 1)[1].strip()
ancestry_map: Dict[str, str] = {}
# Split on tabs, spaces, commas, semicolons or any combination of them
tokens = [tok.strip() for tok in re.split(r'[,\t; ]+', comment) if tok]
for tok in tokens:
if '=' not in tok:
continue # Skip invalid pieces
left, right = (p.strip() for p in tok.split('=', 1))
# Detect whether format is "Pop=0" or "0=Pop"
if left.isdigit() and not right.isdigit():
ancestry_map[left] = right # 0=Africa
elif right.isdigit() and not left.isdigit():
ancestry_map[right] = left # Africa=0
else:
# Fallback (if both sides are digits or both are pops, keep left as code)
ancestry_map[left] = right
return ancestry_map
def _replace_nan_with_none(self, array: Optional[np.ndarray]) -> Optional[np.ndarray]:
"""
Replace arrays that are fully NaN with `None`.
Args:
array (np.ndarray): Array to check.
Returns:
Optional[np.ndarray]: Returns `None` if the array is fully NaN, otherwise returns the original array.
"""
if array is not None:
if array.size == 0: # Check if the array is empty
return None
if np.issubdtype(array.dtype, np.number): # Check for numeric types
if np.isnan(array).all(): # Fully NaN numeric array
return None
elif array.dtype == np.object_ or np.issubdtype(array.dtype, np.str_): # String or object types
if np.all((array == '') | (array == None)): # Empty or None strings
return None
return array
[docs]
def read(self) -> 'LocalAncestryObject':
"""
Read data from the provided `.msp` or `msp.tsv` `file` and construct a
`snputils.ancestry.genobj.LocalAncestryObject`.
Expected MSP content:
The `.msp` file should contain local ancestry assignments for each haplotype across genomic windows.
Each row should correspond to a genomic window and include the following columns:
- `#chm`: Chromosome numbers corresponding to each genomic window.
- `spos`: Start physical position for each window.
- `epos`: End physical position for each window.
- `sgpos`: Start centimorgan position for each window.
- `egpos`: End centimorgan position for each window.
- `n snps`: Number of SNPs in each genomic window.
- `SampleID.0`: Local ancestry for the first haplotype of the sample for each window.
- `SampleID.1`: Local ancestry for the second haplotype of the sample for each window.
Returns:
LocalAncestryObject:
A LocalAncestryObject instance.
"""
log.info(f"Reading '{self.file}'...")
metadata = self.read_metadata()
comment = metadata.comment
header = metadata.header
# Read the main data into a DataFrame, skipping comment lines
msp_df = pd.read_csv(self.file, sep="\t", comment="#", names=header)
# Extract chromosomes data
chromosomes = msp_df['#chm'].astype(str).to_numpy()
# Extract physical positions (if available)
column_counter = metadata.first_lai_col_indx
if metadata.has_physical_pos:
physical_pos = msp_df[['spos', 'epos']].to_numpy()
else:
physical_pos = None
log.warning("Physical positions ('spos' and 'epos') not found.")
# Extract centimorgan positions (if available)
if metadata.has_centimorgan_pos:
centimorgan_pos = msp_df[['sgpos', 'egpos']].to_numpy()
else:
centimorgan_pos = None
log.warning("Genetic (centimorgan) positions ('sgpos' and 'egpos') not found.")
# Extract window sizes (if available)
if metadata.has_window_sizes:
window_sizes = msp_df['n snps'].to_numpy()
else:
window_sizes = None
log.warning("Window sizes ('n snps') not found.")
# Extract LAI data (haplotype-level)
lai = msp_df.iloc[:, column_counter:].to_numpy(dtype=np.uint8, copy=False)
# Extract haplotype identifiers
haplotypes = metadata.haplotypes
# Extract haplotype identifiers and sample identifiers
samples = metadata.samples
del msp_df
gc.collect()
# Validate the number of samples matches the LAI data dimensions
n_samples = len(samples)
if n_samples != int(lai.shape[1] / 2):
raise ValueError(
"Mismatch between the number of sample identifiers and the expected number of samples in the LAI array. "
f"Expected {int(lai.shape[1] / 2)} samples (derived from LAI data); found {n_samples}."
)
# Count number of unique ancestries in the LAI data
n_ancestries = len(np.unique(lai))
# Parse ancestry map from the comment (if available)
ancestry_map = None
if comment is not None:
ancestry_map = metadata.ancestry_map
if len(ancestry_map) != n_ancestries:
warnings.warn(
"Mismatch between the number of unique ancestries in the LAI data "
f"({n_ancestries}) and the number of classes in the ancestry map "
f"({len(ancestry_map)})."
)
else:
# Provide default ancestry mapping if no comment is provided
ancestry_map = None
warnings.warn(
"Ancestry map not found. It is recommended to provide an .msp file that contains the ancestry "
"map as a comment in the first line."
)
# Replace fully NaN attributes with None
window_sizes = self._replace_nan_with_none(window_sizes)
centimorgan_pos = self._replace_nan_with_none(centimorgan_pos)
chromosomes = self._replace_nan_with_none(chromosomes)
physical_pos = self._replace_nan_with_none(physical_pos)
return LocalAncestryObject(
haplotypes=haplotypes,
lai=lai,
samples=samples,
ancestry_map=ancestry_map,
window_sizes=window_sizes,
centimorgan_pos=centimorgan_pos,
chromosomes=chromosomes,
physical_pos=physical_pos
)
LAIBaseReader.register(MSPReader)