import logging
import copy
from typing import Any, List, Optional, Sequence, Tuple
import numpy as np
from snputils._utils.printing import format_repr
log = logging.getLogger(__name__)
[docs]
class IBDObject:
"""
A class for Identity-By-Descent (IBD) segment data.
"""
def __init__(
self,
sample_id_1: np.ndarray,
haplotype_id_1: np.ndarray,
sample_id_2: np.ndarray,
haplotype_id_2: np.ndarray,
chrom: np.ndarray,
start: np.ndarray,
end: np.ndarray,
length_cm: Optional[np.ndarray] = None,
segment_type: Optional[np.ndarray] = None,
) -> None:
"""
Args:
sample_id_1 (array of shape (n_segments,)): Sample identifiers for the first individual.
haplotype_id_1 (array of shape (n_segments,)): Haplotype identifiers for the first individual (values in {1, 2}, or -1 if unknown).
sample_id_2 (array of shape (n_segments,)): Sample identifiers for the second individual.
haplotype_id_2 (array of shape (n_segments,)): Haplotype identifiers for the second individual (values in {1, 2}, or -1 if unknown).
chrom (array of shape (n_segments,)): Chromosome identifier for each IBD segment.
start (array of shape (n_segments,)): Start physical position (1-based, bp) for each IBD segment.
end (array of shape (n_segments,)): End physical position (1-based, bp) for each IBD segment.
length_cm (array of shape (n_segments,), optional): Genetic length (cM) for each segment, if available.
"""
# Store attributes
self.__sample_id_1 = np.asarray(sample_id_1)
self.__haplotype_id_1 = np.asarray(haplotype_id_1)
self.__sample_id_2 = np.asarray(sample_id_2)
self.__haplotype_id_2 = np.asarray(haplotype_id_2)
self.__chrom = np.asarray(chrom)
self.__start = np.asarray(start)
self.__end = np.asarray(end)
self.__length_cm = None if length_cm is None else np.asarray(length_cm)
self.__segment_type = None if segment_type is None else np.asarray(segment_type)
self._sanity_check()
def __getitem__(self, key: str) -> Any:
"""
To access an attribute of the class using the square bracket notation,
similar to a dictionary.
"""
try:
return getattr(self, key)
except Exception:
raise KeyError(f"Invalid key: {key}.")
def __setitem__(self, key: str, value: Any) -> None:
"""
To set an attribute of the class using the square bracket notation,
similar to a dictionary.
"""
try:
setattr(self, key, value)
except Exception:
raise KeyError(f"Invalid key: {key}.")
def __repr__(self) -> str:
return format_repr(
self,
shape=self.shape,
n_segments=self.n_segments,
n_samples=self.n_samples,
n_chromosomes=self.n_chromosomes,
has_length_cm=self.__length_cm is not None,
has_segment_type=self.__segment_type is not None,
)
def __str__(self) -> str:
return self.__repr__()
@property
def sample_id_1(self) -> np.ndarray:
"""
Retrieve `sample_id_1`.
Returns:
array of shape (n_segments,): Sample identifiers for the first individual.
"""
return self.__sample_id_1
@sample_id_1.setter
def sample_id_1(self, x: Sequence) -> None:
"""
Update `sample_id_1`.
"""
self.__sample_id_1 = np.asarray(x)
@property
def haplotype_id_1(self) -> np.ndarray:
"""
Retrieve `haplotype_id_1`.
Returns:
array of shape (n_segments,): Haplotype identifiers for the first individual (values in {1, 2}).
"""
return self.__haplotype_id_1
@haplotype_id_1.setter
def haplotype_id_1(self, x: Sequence) -> None:
"""
Update `haplotype_id_1`.
"""
self.__haplotype_id_1 = np.asarray(x)
@property
def sample_id_2(self) -> np.ndarray:
"""
Retrieve `sample_id_2`.
Returns:
array of shape (n_segments,): Sample identifiers for the second individual.
"""
return self.__sample_id_2
@sample_id_2.setter
def sample_id_2(self, x: Sequence) -> None:
"""
Update `sample_id_2`.
"""
self.__sample_id_2 = np.asarray(x)
@property
def haplotype_id_2(self) -> np.ndarray:
"""
Retrieve `haplotype_id_2`.
Returns:
array of shape (n_segments,): Haplotype identifiers for the second individual (values in {1, 2}).
"""
return self.__haplotype_id_2
@haplotype_id_2.setter
def haplotype_id_2(self, x: Sequence) -> None:
"""
Update `haplotype_id_2`.
"""
self.__haplotype_id_2 = np.asarray(x)
@property
def chrom(self) -> np.ndarray:
"""
Retrieve `chrom`.
Returns:
array of shape (n_segments,): Chromosome identifier for each IBD segment.
"""
return self.__chrom
@chrom.setter
def chrom(self, x: Sequence) -> None:
"""
Update `chrom`.
"""
self.__chrom = np.asarray(x)
@property
def start(self) -> np.ndarray:
"""
Retrieve `start`.
Returns:
array of shape (n_segments,): Start physical position (1-based, bp) for each IBD segment.
"""
return self.__start
@start.setter
def start(self, x: Sequence) -> None:
"""
Update `start`.
"""
self.__start = np.asarray(x)
@property
def end(self) -> np.ndarray:
"""
Retrieve `end`.
Returns:
array of shape (n_segments,): End physical position (1-based, bp) for each IBD segment.
"""
return self.__end
@end.setter
def end(self, x: Sequence) -> None:
"""
Update `end`.
"""
self.__end = np.asarray(x)
@property
def length_cm(self) -> Optional[np.ndarray]:
"""
Retrieve `length_cm`.
Returns:
array of shape (n_segments,): Genetic length (cM) for each segment if available; otherwise None.
"""
return self.__length_cm
@length_cm.setter
def length_cm(self, x: Optional[Sequence]) -> None:
"""
Update `length_cm`.
"""
self.__length_cm = None if x is None else np.asarray(x)
@property
def segment_type(self) -> Optional[np.ndarray]:
"""
Retrieve `segment_type`.
Returns:
array of shape (n_segments,): Segment type labels (e.g., 'IBD1', 'IBD2'), or None if unavailable.
"""
return self.__segment_type
@segment_type.setter
def segment_type(self, x: Optional[Sequence]) -> None:
"""
Update `segment_type`.
"""
self.__segment_type = None if x is None else np.asarray(x)
@property
def n_segments(self) -> int:
"""
Retrieve `n_segments`.
Returns:
int: The total number of IBD segments.
"""
return self.__chrom.shape[0]
@property
def n_samples(self) -> int:
"""
Retrieve the number of unique samples represented in the segments.
"""
samples = np.concatenate([self.__sample_id_1, self.__sample_id_2])
return len(np.unique(samples))
@property
def n_chromosomes(self) -> int:
"""
Retrieve the number of unique chromosomes represented in the segments.
"""
return len(np.unique(self.__chrom))
@property
def shape(self) -> tuple[int]:
"""
Retrieve the one-dimensional segment shape.
"""
return (self.n_segments,)
@property
def pairs(self) -> np.ndarray:
"""
Retrieve `pairs`.
Returns:
array of shape (n_segments, 2): Per-segment sample identifier pairs.
"""
return np.column_stack([self.__sample_id_1, self.__sample_id_2])
@property
def haplotype_pairs(self) -> np.ndarray:
"""
Retrieve `haplotype_pairs`.
Returns:
array of shape (n_segments, 2): Per-segment haplotype identifier pairs.
"""
return np.column_stack([self.__haplotype_id_1, self.__haplotype_id_2])
[docs]
def copy(self) -> 'IBDObject':
"""
Create and return a copy of `self`.
Returns:
IBDObject: A new instance of the current object.
"""
return copy.deepcopy(self)
[docs]
def keys(self) -> List[str]:
"""
Retrieve a list of public attribute names for `self`.
Returns:
list of str: A list of attribute names, with internal name-mangling removed.
"""
return [attr.replace('_IBDObject__', '') for attr in vars(self)]
[docs]
def filter_segments(
self,
chrom: Optional[Sequence[str]] = None,
samples: Optional[Sequence[str]] = None,
min_length_cm: Optional[float] = None,
segment_types: Optional[Sequence[str]] = None,
inplace: bool = False,
) -> Optional['IBDObject']:
"""
Filter IBD segments by chromosome, sample names, and/or minimum genetic length.
Args:
chrom (sequence of str, optional): Chromosome(s) to include.
samples (sequence of str, optional): Sample names to include if present in either column.
min_length_cm (float, optional): Minimum cM length threshold.
inplace (bool, default=False): If True, modifies `self` in place. If False, returns a new `IBDObject`.
Returns:
Optional[IBDObject]: A filtered IBDObject if `inplace=False`. If `inplace=True`, returns None.
"""
mask = np.ones(self.n_segments, dtype=bool)
if chrom is not None:
chrom = np.atleast_1d(chrom)
mask &= np.isin(self.__chrom, chrom)
if samples is not None:
samples = np.atleast_1d(samples)
mask &= np.isin(self.__sample_id_1, samples) | np.isin(self.__sample_id_2, samples)
if min_length_cm is not None and self.__length_cm is not None:
mask &= self.__length_cm >= float(min_length_cm)
if segment_types is not None and self.__segment_type is not None:
segment_types = np.atleast_1d(segment_types)
mask &= np.isin(self.__segment_type, segment_types)
def _apply_mask(x: Optional[np.ndarray]) -> Optional[np.ndarray]:
return None if x is None else np.asarray(x)[mask]
if inplace:
self.__sample_id_1 = _apply_mask(self.__sample_id_1)
self.__haplotype_id_1 = _apply_mask(self.__haplotype_id_1)
self.__sample_id_2 = _apply_mask(self.__sample_id_2)
self.__haplotype_id_2 = _apply_mask(self.__haplotype_id_2)
self.__chrom = _apply_mask(self.__chrom)
self.__start = _apply_mask(self.__start)
self.__end = _apply_mask(self.__end)
self.__length_cm = _apply_mask(self.__length_cm)
self.__segment_type = _apply_mask(self.__segment_type)
return None
else:
return IBDObject(
sample_id_1=_apply_mask(self.__sample_id_1),
haplotype_id_1=_apply_mask(self.__haplotype_id_1),
sample_id_2=_apply_mask(self.__sample_id_2),
haplotype_id_2=_apply_mask(self.__haplotype_id_2),
chrom=_apply_mask(self.__chrom),
start=_apply_mask(self.__start),
end=_apply_mask(self.__end),
length_cm=_apply_mask(self.__length_cm),
segment_type=_apply_mask(self.__segment_type),
)
[docs]
def restrict_to_ancestry(
self,
*,
laiobj: Any,
ancestry: Any,
require_both_haplotypes: bool = False,
min_bp: Optional[int] = None,
min_cm: Optional[float] = None,
inplace: bool = False,
method: str = 'clip',
) -> Optional['IBDObject']:
"""
Filter and/or trim IBD segments to intervals where both individuals carry the specified ancestry
according to a `LocalAncestryObject`.
This performs an interval intersection per segment against ancestry
tracts. If haplotype IDs are known (for example Hap-IBD), ancestry is
checked on those specific haplotypes. If haplotype IDs are unknown
(for example ancIBD, where ``haplotype_id_* == -1``), ancestry is
considered present for an individual if at least one haplotype matches
the target ancestry, unless ``require_both_haplotypes=True``.
``method='strict'`` drops entire segments when any overlapping LAI
window has non-target ancestry for either individual.
``method='clip'`` trims segments to contiguous regions where both
individuals carry target ancestry, clipped to LAI window boundaries
and original IBD segment boundaries.
Args:
laiobj: LocalAncestryObject containing 2D `lai` of shape (n_windows, n_haplotypes),
`physical_pos` (n_windows, 2), and `chromosomes` (n_windows,).
ancestry: Target ancestry code or label. Compared as string, so both int and str work.
require_both_haplotypes: If True, require both haplotypes of each individual to have
the target ancestry within a window. When haplotypes are known per segment, this
only affects cases with unknown haplotypes (== -1) or IBD2 segments.
min_bp: Minimum base-pair length to retain a segment (strict) or subsegment (clip).
min_cm: Minimum centiMorgan length to retain a segment (strict) or subsegment (clip).
inplace: If True, replace `self` with the restricted object; else return a new object.
method: Method to use for filtering. 'strict' drops entire segments that overlap with
non-target ancestry. 'clip' trims segments to target ancestry regions.
Returns:
Optional[IBDObject]: A restricted IBDObject if `inplace=False`. If `inplace=True`,
returns None.
"""
if method not in ['strict', 'clip']:
raise ValueError(f"Method must be 'strict' or 'clip', got '{method}'")
# Basic LAI shape/metadata checks
lai = getattr(laiobj, 'lai', None)
physical_pos = getattr(laiobj, 'physical_pos', None)
chromosomes = getattr(laiobj, 'chromosomes', None)
centimorgan_pos = getattr(laiobj, 'centimorgan_pos', None)
haplotypes = getattr(laiobj, 'haplotypes', None)
if lai is None or physical_pos is None or chromosomes is None or haplotypes is None:
raise ValueError(
"`laiobj` must provide `lai`, `physical_pos`, `chromosomes`, and `haplotypes`."
)
if lai.ndim != 2:
raise ValueError("`laiobj.lai` must be 2D with shape (n_windows, n_haplotypes).")
# Build haplotype label -> column index map (labels like 'Sample.0', 'Sample.1')
hap_to_col = {str(h): i for i, h in enumerate(haplotypes)}
# Coerce ancestry to str for robust comparisons
anc_str = str(ancestry)
# Coerce LAI values to str once for comparisons
lai_str = lai.astype(str)
# Prepare arrays for the restricted segments
out_sample_id_1: List[str] = []
out_haplotype_id_1: List[int] = []
out_sample_id_2: List[str] = []
out_haplotype_id_2: List[int] = []
out_chrom: List[str] = []
out_start: List[int] = []
out_end: List[int] = []
out_length_cm: List[float] = []
out_segment_type: List[str] = [] if self.__segment_type is not None else None # type: ignore
# Vectorize chrom compare by making LAI chromosome strings
chr_lai = np.asarray(chromosomes).astype(str)
# Helper to compute cM length for a trimmed interval using LAI windows
def _approx_cm_len(chr_mask: np.ndarray, start_bp: int, end_bp: int) -> Optional[float]:
if centimorgan_pos is None:
return None
win_st = physical_pos[chr_mask, 0]
win_en = physical_pos[chr_mask, 1]
win_cm_st = centimorgan_pos[chr_mask, 0]
win_cm_en = centimorgan_pos[chr_mask, 1]
cm_total = 0.0
for ws, we, cs, ce in zip(win_st, win_en, win_cm_st, win_cm_en):
# Overlap with [start_bp, end_bp]
overlap_start = max(int(ws), int(start_bp))
overlap_end = min(int(we), int(end_bp))
if overlap_start > overlap_end:
continue
wlen_bp = max(1, int(we) - int(ws) + 1)
olen_bp = int(overlap_end) - int(overlap_start) + 1
frac = float(olen_bp) / float(wlen_bp)
cm_total += frac * float(ce - cs)
return cm_total
# Iterate over segments
for i in range(self.n_segments):
chrom = str(self.__chrom[i])
seg_start = int(self.__start[i])
seg_end = int(self.__end[i])
if seg_end < seg_start:
continue
# Subset LAI windows on this chromosome that overlap the segment
idx_chr = (chr_lai == chrom)
if not np.any(idx_chr):
continue
lai_st = physical_pos[idx_chr, 0]
lai_en = physical_pos[idx_chr, 1]
overlaps = (lai_en >= seg_start) & (lai_st <= seg_end)
if not np.any(overlaps):
continue
# Build per-window ancestry mask for both individuals
s1 = str(self.__sample_id_1[i])
s2 = str(self.__sample_id_2[i])
h1 = int(self.__haplotype_id_1[i]) if self.__haplotype_id_1 is not None else -1
h2 = int(self.__haplotype_id_2[i]) if self.__haplotype_id_2 is not None else -1
# Resolve haplotype column indices for each sample
# Known haplotypes are 1-based in inputs; convert to {0,1}
def _get_cols(sample: str) -> Tuple[int, int]:
a = hap_to_col.get(f"{sample}.0")
b = hap_to_col.get(f"{sample}.1")
if a is None or b is None:
raise ValueError(f"Sample '{sample}' not found in LAI haplotypes.")
return a, b
s1_a, s1_b = _get_cols(s1)
s2_a, s2_b = _get_cols(s2)
# LAI rows for this chromosome
lai_rows = lai_str[idx_chr, :]
# Determine ancestry presence per window for each individual
if h1 in (1, 2) and h2 in (1, 2):
# Use specific haplotypes
s1_col = s1_a if (h1 - 1) == 0 else s1_b
s2_col = s2_a if (h2 - 1) == 0 else s2_b
s1_mask = (lai_rows[:, s1_col] == anc_str)
s2_mask = (lai_rows[:, s2_col] == anc_str)
if require_both_haplotypes:
# Additionally require the other hap of each sample to match
s1_other = s1_b if s1_col == s1_a else s1_a
s2_other = s2_b if s2_col == s2_a else s2_a
s1_mask = s1_mask & (lai_rows[:, s1_other] == anc_str)
s2_mask = s2_mask & (lai_rows[:, s2_other] == anc_str)
else:
# Unknown hap IDs: require at least one hap to match (or both if requested)
if require_both_haplotypes:
s1_mask = (lai_rows[:, s1_a] == anc_str) & (lai_rows[:, s1_b] == anc_str)
s2_mask = (lai_rows[:, s2_a] == anc_str) & (lai_rows[:, s2_b] == anc_str)
else:
s1_mask = (lai_rows[:, s1_a] == anc_str) | (lai_rows[:, s1_b] == anc_str)
s2_mask = (lai_rows[:, s2_a] == anc_str) | (lai_rows[:, s2_b] == anc_str)
keep = overlaps & s1_mask & s2_mask
if method == 'strict':
# In strict mode, ALL overlapping windows must have target ancestry
if not np.array_equal(overlaps, keep):
continue # Drop entire segment
# Apply length filters to original segment
if min_bp is not None and (seg_end - seg_start + 1) < int(min_bp):
continue
# In strict mode, preserve original length_cm
cm_len = float(self.__length_cm[i]) if self.__length_cm is not None else None
if min_cm is not None:
if cm_len is None or cm_len < float(min_cm):
continue
# Keep entire original segment
out_sample_id_1.append(s1)
out_sample_id_2.append(s2)
out_haplotype_id_1.append(h1)
out_haplotype_id_2.append(h2)
out_chrom.append(chrom)
out_start.append(seg_start)
out_end.append(seg_end)
out_length_cm.append(float(cm_len) if cm_len is not None else float('nan'))
if out_segment_type is not None:
out_segment_type.append(str(self.__segment_type[i])) # type: ignore
else: # method == 'clip'
if not np.any(keep):
continue
# Identify contiguous windows where keep=True
idx_keep = np.where(keep)[0]
# Split into runs of consecutive indices
breaks = np.where(np.diff(idx_keep) > 1)[0]
run_starts = np.r_[0, breaks + 1]
run_ends = np.r_[breaks, idx_keep.size - 1]
# Create subsegments for each contiguous run
for rs, re in zip(run_starts, run_ends):
i0 = idx_keep[rs]
i1 = idx_keep[re]
sub_start = int(max(seg_start, int(lai_st[i0])))
sub_end = int(min(seg_end, int(lai_en[i1])))
if sub_end < sub_start:
continue
# Length filters: bp first
if min_bp is not None and (sub_end - sub_start + 1) < int(min_bp):
continue
# Compute cM length if possible, else approximate or None
cm_len = _approx_cm_len(idx_chr, sub_start, sub_end)
if cm_len is None and self.__length_cm is not None:
# Scale the original segment length by bp fraction
total_bp = max(1, int(seg_end - seg_start + 1))
frac_bp = float(sub_end - sub_start + 1) / float(total_bp)
try:
cm_len = float(self.__length_cm[i]) * frac_bp
except Exception:
cm_len = None
# Apply cM filter if requested (treat None as 0)
if min_cm is not None:
if cm_len is None or cm_len < float(min_cm):
continue
# Append trimmed segment
out_sample_id_1.append(s1)
out_sample_id_2.append(s2)
out_haplotype_id_1.append(h1)
out_haplotype_id_2.append(h2)
out_chrom.append(chrom)
out_start.append(sub_start)
out_end.append(sub_end)
out_length_cm.append(float(cm_len) if cm_len is not None else float('nan'))
if out_segment_type is not None:
out_segment_type.append(str(self.__segment_type[i])) # type: ignore
# If nothing remains, return empty object with zero segments
if len(out_start) == 0:
# Build minimal arrays
empty = IBDObject(
sample_id_1=np.array([], dtype=object),
haplotype_id_1=np.array([], dtype=int),
sample_id_2=np.array([], dtype=object),
haplotype_id_2=np.array([], dtype=int),
chrom=np.array([], dtype=object),
start=np.array([], dtype=int),
end=np.array([], dtype=int),
length_cm=None,
segment_type=None if out_segment_type is None else np.array([], dtype=object),
)
if inplace:
self.__sample_id_1 = empty.sample_id_1
self.__haplotype_id_1 = empty.haplotype_id_1
self.__sample_id_2 = empty.sample_id_2
self.__haplotype_id_2 = empty.haplotype_id_2
self.__chrom = empty.chrom
self.__start = empty.start
self.__end = empty.end
self.__length_cm = empty.length_cm
self.__segment_type = empty.segment_type
return None
return empty
# Assemble outputs
out_length_array: Optional[np.ndarray]
if len(out_length_cm) > 0:
# Convert NaNs to None-equivalent by using np.array with dtype float
out_length_array = np.asarray(out_length_cm, dtype=float)
else:
out_length_array = None
new_obj = IBDObject(
sample_id_1=np.asarray(out_sample_id_1, dtype=object),
haplotype_id_1=np.asarray(out_haplotype_id_1, dtype=int),
sample_id_2=np.asarray(out_sample_id_2, dtype=object),
haplotype_id_2=np.asarray(out_haplotype_id_2, dtype=int),
chrom=np.asarray(out_chrom, dtype=object),
start=np.asarray(out_start, dtype=int),
end=np.asarray(out_end, dtype=int),
length_cm=out_length_array,
segment_type=None if out_segment_type is None else np.asarray(out_segment_type, dtype=object),
)
if inplace:
self.__sample_id_1 = new_obj.sample_id_1
self.__haplotype_id_1 = new_obj.haplotype_id_1
self.__sample_id_2 = new_obj.sample_id_2
self.__haplotype_id_2 = new_obj.haplotype_id_2
self.__chrom = new_obj.chrom
self.__start = new_obj.start
self.__end = new_obj.end
self.__length_cm = new_obj.length_cm
self.__segment_type = new_obj.segment_type
return None
return new_obj
def _sanity_check(self) -> None:
"""
Perform sanity checks on the parsed data to ensure data integrity.
"""
n = self.__chrom.shape[0]
arrays = [
self.__sample_id_1,
self.__haplotype_id_1,
self.__sample_id_2,
self.__haplotype_id_2,
self.__start,
self.__end,
]
if any(arr.shape[0] != n for arr in arrays):
raise ValueError("All input arrays must have the same length.")
if self.__length_cm is not None and self.__length_cm.shape[0] != n:
raise ValueError("`length_cm` must have the same length as other arrays.")
if self.__segment_type is not None and self.__segment_type.shape[0] != n:
raise ValueError("`segment_type` must have the same length as other arrays.")
# Validate haplotype identifiers are 1 or 2, or -1 when unknown
valid_values = np.array([1, 2, -1])
if not np.isin(self.__haplotype_id_1, valid_values).all() or not np.isin(self.__haplotype_id_2, valid_values).all():
raise ValueError("Haplotype identifiers must be in {1, 2} or -1 if unknown.")