snputils.ibd
12class IBDObject: 13 """ 14 A class for Identity-By-Descent (IBD) segment data. 15 """ 16 17 def __init__( 18 self, 19 sample_id_1: np.ndarray, 20 haplotype_id_1: np.ndarray, 21 sample_id_2: np.ndarray, 22 haplotype_id_2: np.ndarray, 23 chrom: np.ndarray, 24 start: np.ndarray, 25 end: np.ndarray, 26 length_cm: Optional[np.ndarray] = None, 27 segment_type: Optional[np.ndarray] = None, 28 ) -> None: 29 """ 30 Args: 31 sample_id_1 (array of shape (n_segments,)): Sample identifiers for the first individual. 32 haplotype_id_1 (array of shape (n_segments,)): Haplotype identifiers for the first individual (values in {1, 2}, or -1 if unknown). 33 sample_id_2 (array of shape (n_segments,)): Sample identifiers for the second individual. 34 haplotype_id_2 (array of shape (n_segments,)): Haplotype identifiers for the second individual (values in {1, 2}, or -1 if unknown). 35 chrom (array of shape (n_segments,)): Chromosome identifier for each IBD segment. 36 start (array of shape (n_segments,)): Start physical position (1-based, bp) for each IBD segment. 37 end (array of shape (n_segments,)): End physical position (1-based, bp) for each IBD segment. 38 length_cm (array of shape (n_segments,), optional): Genetic length (cM) for each segment, if available. 39 """ 40 # Store attributes 41 self.__sample_id_1 = np.asarray(sample_id_1) 42 self.__haplotype_id_1 = np.asarray(haplotype_id_1) 43 self.__sample_id_2 = np.asarray(sample_id_2) 44 self.__haplotype_id_2 = np.asarray(haplotype_id_2) 45 self.__chrom = np.asarray(chrom) 46 self.__start = np.asarray(start) 47 self.__end = np.asarray(end) 48 self.__length_cm = None if length_cm is None else np.asarray(length_cm) 49 self.__segment_type = None if segment_type is None else np.asarray(segment_type) 50 51 self._sanity_check() 52 53 def __getitem__(self, key: str) -> Any: 54 """ 55 To access an attribute of the class using the square bracket notation, 56 similar to a dictionary. 57 """ 58 try: 59 return getattr(self, key) 60 except Exception: 61 raise KeyError(f"Invalid key: {key}.") 62 63 def __setitem__(self, key: str, value: Any) -> None: 64 """ 65 To set an attribute of the class using the square bracket notation, 66 similar to a dictionary. 67 """ 68 try: 69 setattr(self, key, value) 70 except Exception: 71 raise KeyError(f"Invalid key: {key}.") 72 73 @property 74 def sample_id_1(self) -> np.ndarray: 75 """ 76 Retrieve `sample_id_1`. 77 78 Returns: 79 **array of shape (n_segments,):** Sample identifiers for the first individual. 80 """ 81 return self.__sample_id_1 82 83 @sample_id_1.setter 84 def sample_id_1(self, x: Sequence) -> None: 85 """ 86 Update `sample_id_1`. 87 """ 88 self.__sample_id_1 = np.asarray(x) 89 90 @property 91 def haplotype_id_1(self) -> np.ndarray: 92 """ 93 Retrieve `haplotype_id_1`. 94 95 Returns: 96 **array of shape (n_segments,):** Haplotype identifiers for the first individual (values in {1, 2}). 97 """ 98 return self.__haplotype_id_1 99 100 @haplotype_id_1.setter 101 def haplotype_id_1(self, x: Sequence) -> None: 102 """ 103 Update `haplotype_id_1`. 104 """ 105 self.__haplotype_id_1 = np.asarray(x) 106 107 @property 108 def sample_id_2(self) -> np.ndarray: 109 """ 110 Retrieve `sample_id_2`. 111 112 Returns: 113 **array of shape (n_segments,):** Sample identifiers for the second individual. 114 """ 115 return self.__sample_id_2 116 117 @sample_id_2.setter 118 def sample_id_2(self, x: Sequence) -> None: 119 """ 120 Update `sample_id_2`. 121 """ 122 self.__sample_id_2 = np.asarray(x) 123 124 @property 125 def haplotype_id_2(self) -> np.ndarray: 126 """ 127 Retrieve `haplotype_id_2`. 128 129 Returns: 130 **array of shape (n_segments,):** Haplotype identifiers for the second individual (values in {1, 2}). 131 """ 132 return self.__haplotype_id_2 133 134 @haplotype_id_2.setter 135 def haplotype_id_2(self, x: Sequence) -> None: 136 """ 137 Update `haplotype_id_2`. 138 """ 139 self.__haplotype_id_2 = np.asarray(x) 140 141 @property 142 def chrom(self) -> np.ndarray: 143 """ 144 Retrieve `chrom`. 145 146 Returns: 147 **array of shape (n_segments,):** Chromosome identifier for each IBD segment. 148 """ 149 return self.__chrom 150 151 @chrom.setter 152 def chrom(self, x: Sequence) -> None: 153 """ 154 Update `chrom`. 155 """ 156 self.__chrom = np.asarray(x) 157 158 @property 159 def start(self) -> np.ndarray: 160 """ 161 Retrieve `start`. 162 163 Returns: 164 **array of shape (n_segments,):** Start physical position (1-based, bp) for each IBD segment. 165 """ 166 return self.__start 167 168 @start.setter 169 def start(self, x: Sequence) -> None: 170 """ 171 Update `start`. 172 """ 173 self.__start = np.asarray(x) 174 175 @property 176 def end(self) -> np.ndarray: 177 """ 178 Retrieve `end`. 179 180 Returns: 181 **array of shape (n_segments,):** End physical position (1-based, bp) for each IBD segment. 182 """ 183 return self.__end 184 185 @end.setter 186 def end(self, x: Sequence) -> None: 187 """ 188 Update `end`. 189 """ 190 self.__end = np.asarray(x) 191 192 @property 193 def length_cm(self) -> Optional[np.ndarray]: 194 """ 195 Retrieve `length_cm`. 196 197 Returns: 198 **array of shape (n_segments,):** Genetic length (cM) for each segment if available; otherwise None. 199 """ 200 return self.__length_cm 201 202 @length_cm.setter 203 def length_cm(self, x: Optional[Sequence]) -> None: 204 """ 205 Update `length_cm`. 206 """ 207 self.__length_cm = None if x is None else np.asarray(x) 208 209 @property 210 def segment_type(self) -> Optional[np.ndarray]: 211 """ 212 Retrieve `segment_type`. 213 214 Returns: 215 **array of shape (n_segments,):** Segment type labels (e.g., 'IBD1', 'IBD2'), or None if unavailable. 216 """ 217 return self.__segment_type 218 219 @segment_type.setter 220 def segment_type(self, x: Optional[Sequence]) -> None: 221 """ 222 Update `segment_type`. 223 """ 224 self.__segment_type = None if x is None else np.asarray(x) 225 226 @property 227 def n_segments(self) -> int: 228 """ 229 Retrieve `n_segments`. 230 231 Returns: 232 **int:** The total number of IBD segments. 233 """ 234 return self.__chrom.shape[0] 235 236 @property 237 def pairs(self) -> np.ndarray: 238 """ 239 Retrieve `pairs`. 240 241 Returns: 242 **array of shape (n_segments, 2):** Per-segment sample identifier pairs. 243 """ 244 return np.column_stack([self.__sample_id_1, self.__sample_id_2]) 245 246 @property 247 def haplotype_pairs(self) -> np.ndarray: 248 """ 249 Retrieve `haplotype_pairs`. 250 251 Returns: 252 **array of shape (n_segments, 2):** Per-segment haplotype identifier pairs. 253 """ 254 return np.column_stack([self.__haplotype_id_1, self.__haplotype_id_2]) 255 256 def copy(self) -> 'IBDObject': 257 """ 258 Create and return a copy of `self`. 259 260 Returns: 261 **IBDObject:** A new instance of the current object. 262 """ 263 return copy.deepcopy(self) 264 265 def keys(self) -> List[str]: 266 """ 267 Retrieve a list of public attribute names for `self`. 268 269 Returns: 270 **list of str:** A list of attribute names, with internal name-mangling removed. 271 """ 272 return [attr.replace('_IBDObject__', '') for attr in vars(self)] 273 274 def filter_segments( 275 self, 276 chrom: Optional[Sequence[str]] = None, 277 samples: Optional[Sequence[str]] = None, 278 min_length_cm: Optional[float] = None, 279 segment_types: Optional[Sequence[str]] = None, 280 inplace: bool = False, 281 ) -> Optional['IBDObject']: 282 """ 283 Filter IBD segments by chromosome, sample names, and/or minimum genetic length. 284 285 Args: 286 chrom (sequence of str, optional): Chromosome(s) to include. 287 samples (sequence of str, optional): Sample names to include if present in either column. 288 min_length_cm (float, optional): Minimum cM length threshold. 289 inplace (bool, default=False): If True, modifies `self` in place. If False, returns a new `IBDObject`. 290 291 Returns: 292 **Optional[IBDObject]:** A filtered IBDObject if `inplace=False`. If `inplace=True`, returns None. 293 """ 294 mask = np.ones(self.n_segments, dtype=bool) 295 296 if chrom is not None: 297 chrom = np.atleast_1d(chrom) 298 mask &= np.isin(self.__chrom, chrom) 299 300 if samples is not None: 301 samples = np.atleast_1d(samples) 302 mask &= np.isin(self.__sample_id_1, samples) | np.isin(self.__sample_id_2, samples) 303 304 if min_length_cm is not None and self.__length_cm is not None: 305 mask &= self.__length_cm >= float(min_length_cm) 306 307 if segment_types is not None and self.__segment_type is not None: 308 segment_types = np.atleast_1d(segment_types) 309 mask &= np.isin(self.__segment_type, segment_types) 310 311 def _apply_mask(x: Optional[np.ndarray]) -> Optional[np.ndarray]: 312 return None if x is None else np.asarray(x)[mask] 313 314 if inplace: 315 self.__sample_id_1 = _apply_mask(self.__sample_id_1) 316 self.__haplotype_id_1 = _apply_mask(self.__haplotype_id_1) 317 self.__sample_id_2 = _apply_mask(self.__sample_id_2) 318 self.__haplotype_id_2 = _apply_mask(self.__haplotype_id_2) 319 self.__chrom = _apply_mask(self.__chrom) 320 self.__start = _apply_mask(self.__start) 321 self.__end = _apply_mask(self.__end) 322 self.__length_cm = _apply_mask(self.__length_cm) 323 self.__segment_type = _apply_mask(self.__segment_type) 324 return None 325 else: 326 return IBDObject( 327 sample_id_1=_apply_mask(self.__sample_id_1), 328 haplotype_id_1=_apply_mask(self.__haplotype_id_1), 329 sample_id_2=_apply_mask(self.__sample_id_2), 330 haplotype_id_2=_apply_mask(self.__haplotype_id_2), 331 chrom=_apply_mask(self.__chrom), 332 start=_apply_mask(self.__start), 333 end=_apply_mask(self.__end), 334 length_cm=_apply_mask(self.__length_cm), 335 segment_type=_apply_mask(self.__segment_type), 336 ) 337 338 def restrict_to_ancestry( 339 self, 340 *, 341 laiobj: Any, 342 ancestry: Any, 343 require_both_haplotypes: bool = False, 344 min_bp: Optional[int] = None, 345 min_cm: Optional[float] = None, 346 inplace: bool = False, 347 method: str = 'clip', 348 ) -> Optional['IBDObject']: 349 """ 350 Filter and/or trim IBD segments to intervals where both individuals carry the specified ancestry 351 according to a `LocalAncestryObject`. 352 353 This performs an interval intersection per segment against ancestry tracts: 354 - If haplotype IDs are known (e.g., Hap-IBD), ancestry is checked on the specific 355 haplotype of each individual. 356 - If haplotype IDs are unknown (e.g., ancIBD; haplotype_id_* == -1), ancestry is 357 considered present for an individual if at least one of their haplotypes matches 358 the requested ancestry (unless `require_both_haplotypes=True`). 359 360 Method 'strict': 361 Drop entire IBD segments if ANY overlapping LAI window contains non-target ancestry 362 for either individual. No trimming occurs - segments are kept whole or dropped completely. 363 364 Method 'clip': 365 Trim IBD segments to contiguous regions where both individuals have the target ancestry. 366 Resulting subsegments are clipped to LAI window boundaries and original IBD start/end, 367 with optional length filtering by bp or cM. 368 369 Args: 370 laiobj: LocalAncestryObject containing 2D `lai` of shape (n_windows, n_haplotypes), 371 `physical_pos` (n_windows, 2), and `chromosomes` (n_windows,). 372 ancestry: Target ancestry code or label. Compared as string, so both int and str work. 373 require_both_haplotypes: If True, require both haplotypes of each individual to have 374 the target ancestry within a window. When haplotypes are known per segment, this 375 only affects cases with unknown haplotypes (== -1) or IBD2 segments. 376 min_bp: Minimum base-pair length to retain a segment (strict) or subsegment (clip). 377 min_cm: Minimum centiMorgan length to retain a segment (strict) or subsegment (clip). 378 inplace: If True, replace `self` with the restricted object; else return a new object. 379 method: Method to use for filtering. 'strict' drops entire segments that overlap with 380 non-target ancestry. 'clip' trims segments to target ancestry regions. 381 382 Returns: 383 Optional[IBDObject]: A restricted IBDObject if `inplace=False`. If `inplace=True`, 384 returns None. 385 """ 386 if method not in ['strict', 'clip']: 387 raise ValueError(f"Method must be 'strict' or 'clip', got '{method}'") 388 389 # Basic LAI shape/metadata checks 390 lai = getattr(laiobj, 'lai', None) 391 physical_pos = getattr(laiobj, 'physical_pos', None) 392 chromosomes = getattr(laiobj, 'chromosomes', None) 393 centimorgan_pos = getattr(laiobj, 'centimorgan_pos', None) 394 haplotypes = getattr(laiobj, 'haplotypes', None) 395 396 if lai is None or physical_pos is None or chromosomes is None or haplotypes is None: 397 raise ValueError( 398 "`laiobj` must provide `lai`, `physical_pos`, `chromosomes`, and `haplotypes`." 399 ) 400 401 if lai.ndim != 2: 402 raise ValueError("`laiobj.lai` must be 2D with shape (n_windows, n_haplotypes).") 403 404 # Build haplotype label -> column index map (labels like 'Sample.0', 'Sample.1') 405 hap_to_col = {str(h): i for i, h in enumerate(haplotypes)} 406 407 # Coerce ancestry to str for robust comparisons 408 anc_str = str(ancestry) 409 410 # Coerce LAI values to str once for comparisons 411 lai_str = lai.astype(str) 412 413 # Prepare arrays for the restricted segments 414 out_sample_id_1: List[str] = [] 415 out_haplotype_id_1: List[int] = [] 416 out_sample_id_2: List[str] = [] 417 out_haplotype_id_2: List[int] = [] 418 out_chrom: List[str] = [] 419 out_start: List[int] = [] 420 out_end: List[int] = [] 421 out_length_cm: List[float] = [] 422 out_segment_type: List[str] = [] if self.__segment_type is not None else None # type: ignore 423 424 # Vectorize chrom compare by making LAI chromosome strings 425 chr_lai = np.asarray(chromosomes).astype(str) 426 427 # Helper to compute cM length for a trimmed interval using LAI windows 428 def _approx_cm_len(chr_mask: np.ndarray, start_bp: int, end_bp: int) -> Optional[float]: 429 if centimorgan_pos is None: 430 return None 431 win_st = physical_pos[chr_mask, 0] 432 win_en = physical_pos[chr_mask, 1] 433 win_cm_st = centimorgan_pos[chr_mask, 0] 434 win_cm_en = centimorgan_pos[chr_mask, 1] 435 cm_total = 0.0 436 for ws, we, cs, ce in zip(win_st, win_en, win_cm_st, win_cm_en): 437 # Overlap with [start_bp, end_bp] 438 overlap_start = max(int(ws), int(start_bp)) 439 overlap_end = min(int(we), int(end_bp)) 440 if overlap_start > overlap_end: 441 continue 442 wlen_bp = max(1, int(we) - int(ws) + 1) 443 olen_bp = int(overlap_end) - int(overlap_start) + 1 444 frac = float(olen_bp) / float(wlen_bp) 445 cm_total += frac * float(ce - cs) 446 return cm_total 447 448 # Iterate over segments 449 for i in range(self.n_segments): 450 chrom = str(self.__chrom[i]) 451 seg_start = int(self.__start[i]) 452 seg_end = int(self.__end[i]) 453 if seg_end < seg_start: 454 continue 455 456 # Subset LAI windows on this chromosome that overlap the segment 457 idx_chr = (chr_lai == chrom) 458 if not np.any(idx_chr): 459 continue 460 lai_st = physical_pos[idx_chr, 0] 461 lai_en = physical_pos[idx_chr, 1] 462 overlaps = (lai_en >= seg_start) & (lai_st <= seg_end) 463 if not np.any(overlaps): 464 continue 465 466 # Build per-window ancestry mask for both individuals 467 s1 = str(self.__sample_id_1[i]) 468 s2 = str(self.__sample_id_2[i]) 469 h1 = int(self.__haplotype_id_1[i]) if self.__haplotype_id_1 is not None else -1 470 h2 = int(self.__haplotype_id_2[i]) if self.__haplotype_id_2 is not None else -1 471 472 # Resolve haplotype column indices for each sample 473 # Known haplotypes are 1-based in inputs; convert to {0,1} 474 def _get_cols(sample: str) -> Tuple[int, int]: 475 a = hap_to_col.get(f"{sample}.0") 476 b = hap_to_col.get(f"{sample}.1") 477 if a is None or b is None: 478 raise ValueError(f"Sample '{sample}' not found in LAI haplotypes.") 479 return a, b 480 481 s1_a, s1_b = _get_cols(s1) 482 s2_a, s2_b = _get_cols(s2) 483 484 # LAI rows for this chromosome 485 lai_rows = lai_str[idx_chr, :] 486 487 # Determine ancestry presence per window for each individual 488 if h1 in (1, 2) and h2 in (1, 2): 489 # Use specific haplotypes 490 s1_col = s1_a if (h1 - 1) == 0 else s1_b 491 s2_col = s2_a if (h2 - 1) == 0 else s2_b 492 s1_mask = (lai_rows[:, s1_col] == anc_str) 493 s2_mask = (lai_rows[:, s2_col] == anc_str) 494 if require_both_haplotypes: 495 # Additionally require the other hap of each sample to match 496 s1_other = s1_b if s1_col == s1_a else s1_a 497 s2_other = s2_b if s2_col == s2_a else s2_a 498 s1_mask = s1_mask & (lai_rows[:, s1_other] == anc_str) 499 s2_mask = s2_mask & (lai_rows[:, s2_other] == anc_str) 500 else: 501 # Unknown hap IDs: require at least one hap to match (or both if requested) 502 if require_both_haplotypes: 503 s1_mask = (lai_rows[:, s1_a] == anc_str) & (lai_rows[:, s1_b] == anc_str) 504 s2_mask = (lai_rows[:, s2_a] == anc_str) & (lai_rows[:, s2_b] == anc_str) 505 else: 506 s1_mask = (lai_rows[:, s1_a] == anc_str) | (lai_rows[:, s1_b] == anc_str) 507 s2_mask = (lai_rows[:, s2_a] == anc_str) | (lai_rows[:, s2_b] == anc_str) 508 509 keep = overlaps & s1_mask & s2_mask 510 511 if method == 'strict': 512 # In strict mode, ALL overlapping windows must have target ancestry 513 if not np.array_equal(overlaps, keep): 514 continue # Drop entire segment 515 516 # Apply length filters to original segment 517 if min_bp is not None and (seg_end - seg_start + 1) < int(min_bp): 518 continue 519 520 # In strict mode, preserve original length_cm 521 cm_len = float(self.__length_cm[i]) if self.__length_cm is not None else None 522 523 if min_cm is not None: 524 if cm_len is None or cm_len < float(min_cm): 525 continue 526 527 # Keep entire original segment 528 out_sample_id_1.append(s1) 529 out_sample_id_2.append(s2) 530 out_haplotype_id_1.append(h1) 531 out_haplotype_id_2.append(h2) 532 out_chrom.append(chrom) 533 out_start.append(seg_start) 534 out_end.append(seg_end) 535 out_length_cm.append(float(cm_len) if cm_len is not None else float('nan')) 536 if out_segment_type is not None: 537 out_segment_type.append(str(self.__segment_type[i])) # type: ignore 538 539 else: # method == 'clip' 540 if not np.any(keep): 541 continue 542 543 # Identify contiguous windows where keep=True 544 idx_keep = np.where(keep)[0] 545 # Split into runs of consecutive indices 546 breaks = np.where(np.diff(idx_keep) > 1)[0] 547 run_starts = np.r_[0, breaks + 1] 548 run_ends = np.r_[breaks, idx_keep.size - 1] 549 550 # Create subsegments for each contiguous run 551 for rs, re in zip(run_starts, run_ends): 552 i0 = idx_keep[rs] 553 i1 = idx_keep[re] 554 sub_start = int(max(seg_start, int(lai_st[i0]))) 555 sub_end = int(min(seg_end, int(lai_en[i1]))) 556 if sub_end < sub_start: 557 continue 558 559 # Length filters: bp first 560 if min_bp is not None and (sub_end - sub_start + 1) < int(min_bp): 561 continue 562 563 # Compute cM length if possible, else approximate or None 564 cm_len = _approx_cm_len(idx_chr, sub_start, sub_end) 565 if cm_len is None and self.__length_cm is not None: 566 # Scale the original segment length by bp fraction 567 total_bp = max(1, int(seg_end - seg_start + 1)) 568 frac_bp = float(sub_end - sub_start + 1) / float(total_bp) 569 try: 570 cm_len = float(self.__length_cm[i]) * frac_bp 571 except Exception: 572 cm_len = None 573 574 # Apply cM filter if requested (treat None as 0) 575 if min_cm is not None: 576 if cm_len is None or cm_len < float(min_cm): 577 continue 578 579 # Append trimmed segment 580 out_sample_id_1.append(s1) 581 out_sample_id_2.append(s2) 582 out_haplotype_id_1.append(h1) 583 out_haplotype_id_2.append(h2) 584 out_chrom.append(chrom) 585 out_start.append(sub_start) 586 out_end.append(sub_end) 587 out_length_cm.append(float(cm_len) if cm_len is not None else float('nan')) 588 if out_segment_type is not None: 589 out_segment_type.append(str(self.__segment_type[i])) # type: ignore 590 591 # If nothing remains, return empty object with zero segments 592 if len(out_start) == 0: 593 # Build minimal arrays 594 empty = IBDObject( 595 sample_id_1=np.array([], dtype=object), 596 haplotype_id_1=np.array([], dtype=int), 597 sample_id_2=np.array([], dtype=object), 598 haplotype_id_2=np.array([], dtype=int), 599 chrom=np.array([], dtype=object), 600 start=np.array([], dtype=int), 601 end=np.array([], dtype=int), 602 length_cm=None, 603 segment_type=None if out_segment_type is None else np.array([], dtype=object), 604 ) 605 if inplace: 606 self.__sample_id_1 = empty.sample_id_1 607 self.__haplotype_id_1 = empty.haplotype_id_1 608 self.__sample_id_2 = empty.sample_id_2 609 self.__haplotype_id_2 = empty.haplotype_id_2 610 self.__chrom = empty.chrom 611 self.__start = empty.start 612 self.__end = empty.end 613 self.__length_cm = empty.length_cm 614 self.__segment_type = empty.segment_type 615 return None 616 return empty 617 618 # Assemble outputs 619 out_length_array: Optional[np.ndarray] 620 if len(out_length_cm) > 0: 621 # Convert NaNs to None-equivalent by using np.array with dtype float 622 out_length_array = np.asarray(out_length_cm, dtype=float) 623 else: 624 out_length_array = None 625 626 new_obj = IBDObject( 627 sample_id_1=np.asarray(out_sample_id_1, dtype=object), 628 haplotype_id_1=np.asarray(out_haplotype_id_1, dtype=int), 629 sample_id_2=np.asarray(out_sample_id_2, dtype=object), 630 haplotype_id_2=np.asarray(out_haplotype_id_2, dtype=int), 631 chrom=np.asarray(out_chrom, dtype=object), 632 start=np.asarray(out_start, dtype=int), 633 end=np.asarray(out_end, dtype=int), 634 length_cm=out_length_array, 635 segment_type=None if out_segment_type is None else np.asarray(out_segment_type, dtype=object), 636 ) 637 638 if inplace: 639 self.__sample_id_1 = new_obj.sample_id_1 640 self.__haplotype_id_1 = new_obj.haplotype_id_1 641 self.__sample_id_2 = new_obj.sample_id_2 642 self.__haplotype_id_2 = new_obj.haplotype_id_2 643 self.__chrom = new_obj.chrom 644 self.__start = new_obj.start 645 self.__end = new_obj.end 646 self.__length_cm = new_obj.length_cm 647 self.__segment_type = new_obj.segment_type 648 return None 649 return new_obj 650 651 def _sanity_check(self) -> None: 652 """ 653 Perform sanity checks on the parsed data to ensure data integrity. 654 """ 655 n = self.__chrom.shape[0] 656 arrays = [ 657 self.__sample_id_1, 658 self.__haplotype_id_1, 659 self.__sample_id_2, 660 self.__haplotype_id_2, 661 self.__start, 662 self.__end, 663 ] 664 if any(arr.shape[0] != n for arr in arrays): 665 raise ValueError("All input arrays must have the same length.") 666 667 if self.__length_cm is not None and self.__length_cm.shape[0] != n: 668 raise ValueError("`length_cm` must have the same length as other arrays.") 669 670 if self.__segment_type is not None and self.__segment_type.shape[0] != n: 671 raise ValueError("`segment_type` must have the same length as other arrays.") 672 673 # Validate haplotype identifiers are 1 or 2, or -1 when unknown 674 valid_values = np.array([1, 2, -1]) 675 if not np.isin(self.__haplotype_id_1, valid_values).all() or not np.isin(self.__haplotype_id_2, valid_values).all(): 676 raise ValueError("Haplotype identifiers must be in {1, 2} or -1 if unknown.")
A class for Identity-By-Descent (IBD) segment data.
17 def __init__( 18 self, 19 sample_id_1: np.ndarray, 20 haplotype_id_1: np.ndarray, 21 sample_id_2: np.ndarray, 22 haplotype_id_2: np.ndarray, 23 chrom: np.ndarray, 24 start: np.ndarray, 25 end: np.ndarray, 26 length_cm: Optional[np.ndarray] = None, 27 segment_type: Optional[np.ndarray] = None, 28 ) -> None: 29 """ 30 Args: 31 sample_id_1 (array of shape (n_segments,)): Sample identifiers for the first individual. 32 haplotype_id_1 (array of shape (n_segments,)): Haplotype identifiers for the first individual (values in {1, 2}, or -1 if unknown). 33 sample_id_2 (array of shape (n_segments,)): Sample identifiers for the second individual. 34 haplotype_id_2 (array of shape (n_segments,)): Haplotype identifiers for the second individual (values in {1, 2}, or -1 if unknown). 35 chrom (array of shape (n_segments,)): Chromosome identifier for each IBD segment. 36 start (array of shape (n_segments,)): Start physical position (1-based, bp) for each IBD segment. 37 end (array of shape (n_segments,)): End physical position (1-based, bp) for each IBD segment. 38 length_cm (array of shape (n_segments,), optional): Genetic length (cM) for each segment, if available. 39 """ 40 # Store attributes 41 self.__sample_id_1 = np.asarray(sample_id_1) 42 self.__haplotype_id_1 = np.asarray(haplotype_id_1) 43 self.__sample_id_2 = np.asarray(sample_id_2) 44 self.__haplotype_id_2 = np.asarray(haplotype_id_2) 45 self.__chrom = np.asarray(chrom) 46 self.__start = np.asarray(start) 47 self.__end = np.asarray(end) 48 self.__length_cm = None if length_cm is None else np.asarray(length_cm) 49 self.__segment_type = None if segment_type is None else np.asarray(segment_type) 50 51 self._sanity_check()
Arguments:
- sample_id_1 (array of shape (n_segments,)): Sample identifiers for the first individual.
- haplotype_id_1 (array of shape (n_segments,)): Haplotype identifiers for the first individual (values in {1, 2}, or -1 if unknown).
- sample_id_2 (array of shape (n_segments,)): Sample identifiers for the second individual.
- haplotype_id_2 (array of shape (n_segments,)): Haplotype identifiers for the second individual (values in {1, 2}, or -1 if unknown).
- chrom (array of shape (n_segments,)): Chromosome identifier for each IBD segment.
- start (array of shape (n_segments,)): Start physical position (1-based, bp) for each IBD segment.
- end (array of shape (n_segments,)): End physical position (1-based, bp) for each IBD segment.
- length_cm (array of shape (n_segments,), optional): Genetic length (cM) for each segment, if available.
73 @property 74 def sample_id_1(self) -> np.ndarray: 75 """ 76 Retrieve `sample_id_1`. 77 78 Returns: 79 **array of shape (n_segments,):** Sample identifiers for the first individual. 80 """ 81 return self.__sample_id_1
Retrieve sample_id_1.
Returns:
array of shape (n_segments,): Sample identifiers for the first individual.
90 @property 91 def haplotype_id_1(self) -> np.ndarray: 92 """ 93 Retrieve `haplotype_id_1`. 94 95 Returns: 96 **array of shape (n_segments,):** Haplotype identifiers for the first individual (values in {1, 2}). 97 """ 98 return self.__haplotype_id_1
Retrieve haplotype_id_1.
Returns:
array of shape (n_segments,): Haplotype identifiers for the first individual (values in {1, 2}).
107 @property 108 def sample_id_2(self) -> np.ndarray: 109 """ 110 Retrieve `sample_id_2`. 111 112 Returns: 113 **array of shape (n_segments,):** Sample identifiers for the second individual. 114 """ 115 return self.__sample_id_2
Retrieve sample_id_2.
Returns:
array of shape (n_segments,): Sample identifiers for the second individual.
124 @property 125 def haplotype_id_2(self) -> np.ndarray: 126 """ 127 Retrieve `haplotype_id_2`. 128 129 Returns: 130 **array of shape (n_segments,):** Haplotype identifiers for the second individual (values in {1, 2}). 131 """ 132 return self.__haplotype_id_2
Retrieve haplotype_id_2.
Returns:
array of shape (n_segments,): Haplotype identifiers for the second individual (values in {1, 2}).
158 @property 159 def start(self) -> np.ndarray: 160 """ 161 Retrieve `start`. 162 163 Returns: 164 **array of shape (n_segments,):** Start physical position (1-based, bp) for each IBD segment. 165 """ 166 return self.__start
Retrieve start.
Returns:
array of shape (n_segments,): Start physical position (1-based, bp) for each IBD segment.
175 @property 176 def end(self) -> np.ndarray: 177 """ 178 Retrieve `end`. 179 180 Returns: 181 **array of shape (n_segments,):** End physical position (1-based, bp) for each IBD segment. 182 """ 183 return self.__end
Retrieve end.
Returns:
array of shape (n_segments,): End physical position (1-based, bp) for each IBD segment.
192 @property 193 def length_cm(self) -> Optional[np.ndarray]: 194 """ 195 Retrieve `length_cm`. 196 197 Returns: 198 **array of shape (n_segments,):** Genetic length (cM) for each segment if available; otherwise None. 199 """ 200 return self.__length_cm
Retrieve length_cm.
Returns:
array of shape (n_segments,): Genetic length (cM) for each segment if available; otherwise None.
209 @property 210 def segment_type(self) -> Optional[np.ndarray]: 211 """ 212 Retrieve `segment_type`. 213 214 Returns: 215 **array of shape (n_segments,):** Segment type labels (e.g., 'IBD1', 'IBD2'), or None if unavailable. 216 """ 217 return self.__segment_type
Retrieve segment_type.
Returns:
array of shape (n_segments,): Segment type labels (e.g., 'IBD1', 'IBD2'), or None if unavailable.
246 @property 247 def haplotype_pairs(self) -> np.ndarray: 248 """ 249 Retrieve `haplotype_pairs`. 250 251 Returns: 252 **array of shape (n_segments, 2):** Per-segment haplotype identifier pairs. 253 """ 254 return np.column_stack([self.__haplotype_id_1, self.__haplotype_id_2])
Retrieve haplotype_pairs.
Returns:
array of shape (n_segments, 2): Per-segment haplotype identifier pairs.
256 def copy(self) -> 'IBDObject': 257 """ 258 Create and return a copy of `self`. 259 260 Returns: 261 **IBDObject:** A new instance of the current object. 262 """ 263 return copy.deepcopy(self)
Create and return a copy of self.
Returns:
IBDObject: A new instance of the current object.
265 def keys(self) -> List[str]: 266 """ 267 Retrieve a list of public attribute names for `self`. 268 269 Returns: 270 **list of str:** A list of attribute names, with internal name-mangling removed. 271 """ 272 return [attr.replace('_IBDObject__', '') for attr in vars(self)]
Retrieve a list of public attribute names for self.
Returns:
list of str: A list of attribute names, with internal name-mangling removed.
274 def filter_segments( 275 self, 276 chrom: Optional[Sequence[str]] = None, 277 samples: Optional[Sequence[str]] = None, 278 min_length_cm: Optional[float] = None, 279 segment_types: Optional[Sequence[str]] = None, 280 inplace: bool = False, 281 ) -> Optional['IBDObject']: 282 """ 283 Filter IBD segments by chromosome, sample names, and/or minimum genetic length. 284 285 Args: 286 chrom (sequence of str, optional): Chromosome(s) to include. 287 samples (sequence of str, optional): Sample names to include if present in either column. 288 min_length_cm (float, optional): Minimum cM length threshold. 289 inplace (bool, default=False): If True, modifies `self` in place. If False, returns a new `IBDObject`. 290 291 Returns: 292 **Optional[IBDObject]:** A filtered IBDObject if `inplace=False`. If `inplace=True`, returns None. 293 """ 294 mask = np.ones(self.n_segments, dtype=bool) 295 296 if chrom is not None: 297 chrom = np.atleast_1d(chrom) 298 mask &= np.isin(self.__chrom, chrom) 299 300 if samples is not None: 301 samples = np.atleast_1d(samples) 302 mask &= np.isin(self.__sample_id_1, samples) | np.isin(self.__sample_id_2, samples) 303 304 if min_length_cm is not None and self.__length_cm is not None: 305 mask &= self.__length_cm >= float(min_length_cm) 306 307 if segment_types is not None and self.__segment_type is not None: 308 segment_types = np.atleast_1d(segment_types) 309 mask &= np.isin(self.__segment_type, segment_types) 310 311 def _apply_mask(x: Optional[np.ndarray]) -> Optional[np.ndarray]: 312 return None if x is None else np.asarray(x)[mask] 313 314 if inplace: 315 self.__sample_id_1 = _apply_mask(self.__sample_id_1) 316 self.__haplotype_id_1 = _apply_mask(self.__haplotype_id_1) 317 self.__sample_id_2 = _apply_mask(self.__sample_id_2) 318 self.__haplotype_id_2 = _apply_mask(self.__haplotype_id_2) 319 self.__chrom = _apply_mask(self.__chrom) 320 self.__start = _apply_mask(self.__start) 321 self.__end = _apply_mask(self.__end) 322 self.__length_cm = _apply_mask(self.__length_cm) 323 self.__segment_type = _apply_mask(self.__segment_type) 324 return None 325 else: 326 return IBDObject( 327 sample_id_1=_apply_mask(self.__sample_id_1), 328 haplotype_id_1=_apply_mask(self.__haplotype_id_1), 329 sample_id_2=_apply_mask(self.__sample_id_2), 330 haplotype_id_2=_apply_mask(self.__haplotype_id_2), 331 chrom=_apply_mask(self.__chrom), 332 start=_apply_mask(self.__start), 333 end=_apply_mask(self.__end), 334 length_cm=_apply_mask(self.__length_cm), 335 segment_type=_apply_mask(self.__segment_type), 336 )
Filter IBD segments by chromosome, sample names, and/or minimum genetic length.
Arguments:
- chrom (sequence of str, optional): Chromosome(s) to include.
- samples (sequence of str, optional): Sample names to include if present in either column.
- min_length_cm (float, optional): Minimum cM length threshold.
- inplace (bool, default=False): If True, modifies
selfin place. If False, returns a newIBDObject.
Returns:
Optional[IBDObject]: A filtered IBDObject if
inplace=False. Ifinplace=True, returns None.
338 def restrict_to_ancestry( 339 self, 340 *, 341 laiobj: Any, 342 ancestry: Any, 343 require_both_haplotypes: bool = False, 344 min_bp: Optional[int] = None, 345 min_cm: Optional[float] = None, 346 inplace: bool = False, 347 method: str = 'clip', 348 ) -> Optional['IBDObject']: 349 """ 350 Filter and/or trim IBD segments to intervals where both individuals carry the specified ancestry 351 according to a `LocalAncestryObject`. 352 353 This performs an interval intersection per segment against ancestry tracts: 354 - If haplotype IDs are known (e.g., Hap-IBD), ancestry is checked on the specific 355 haplotype of each individual. 356 - If haplotype IDs are unknown (e.g., ancIBD; haplotype_id_* == -1), ancestry is 357 considered present for an individual if at least one of their haplotypes matches 358 the requested ancestry (unless `require_both_haplotypes=True`). 359 360 Method 'strict': 361 Drop entire IBD segments if ANY overlapping LAI window contains non-target ancestry 362 for either individual. No trimming occurs - segments are kept whole or dropped completely. 363 364 Method 'clip': 365 Trim IBD segments to contiguous regions where both individuals have the target ancestry. 366 Resulting subsegments are clipped to LAI window boundaries and original IBD start/end, 367 with optional length filtering by bp or cM. 368 369 Args: 370 laiobj: LocalAncestryObject containing 2D `lai` of shape (n_windows, n_haplotypes), 371 `physical_pos` (n_windows, 2), and `chromosomes` (n_windows,). 372 ancestry: Target ancestry code or label. Compared as string, so both int and str work. 373 require_both_haplotypes: If True, require both haplotypes of each individual to have 374 the target ancestry within a window. When haplotypes are known per segment, this 375 only affects cases with unknown haplotypes (== -1) or IBD2 segments. 376 min_bp: Minimum base-pair length to retain a segment (strict) or subsegment (clip). 377 min_cm: Minimum centiMorgan length to retain a segment (strict) or subsegment (clip). 378 inplace: If True, replace `self` with the restricted object; else return a new object. 379 method: Method to use for filtering. 'strict' drops entire segments that overlap with 380 non-target ancestry. 'clip' trims segments to target ancestry regions. 381 382 Returns: 383 Optional[IBDObject]: A restricted IBDObject if `inplace=False`. If `inplace=True`, 384 returns None. 385 """ 386 if method not in ['strict', 'clip']: 387 raise ValueError(f"Method must be 'strict' or 'clip', got '{method}'") 388 389 # Basic LAI shape/metadata checks 390 lai = getattr(laiobj, 'lai', None) 391 physical_pos = getattr(laiobj, 'physical_pos', None) 392 chromosomes = getattr(laiobj, 'chromosomes', None) 393 centimorgan_pos = getattr(laiobj, 'centimorgan_pos', None) 394 haplotypes = getattr(laiobj, 'haplotypes', None) 395 396 if lai is None or physical_pos is None or chromosomes is None or haplotypes is None: 397 raise ValueError( 398 "`laiobj` must provide `lai`, `physical_pos`, `chromosomes`, and `haplotypes`." 399 ) 400 401 if lai.ndim != 2: 402 raise ValueError("`laiobj.lai` must be 2D with shape (n_windows, n_haplotypes).") 403 404 # Build haplotype label -> column index map (labels like 'Sample.0', 'Sample.1') 405 hap_to_col = {str(h): i for i, h in enumerate(haplotypes)} 406 407 # Coerce ancestry to str for robust comparisons 408 anc_str = str(ancestry) 409 410 # Coerce LAI values to str once for comparisons 411 lai_str = lai.astype(str) 412 413 # Prepare arrays for the restricted segments 414 out_sample_id_1: List[str] = [] 415 out_haplotype_id_1: List[int] = [] 416 out_sample_id_2: List[str] = [] 417 out_haplotype_id_2: List[int] = [] 418 out_chrom: List[str] = [] 419 out_start: List[int] = [] 420 out_end: List[int] = [] 421 out_length_cm: List[float] = [] 422 out_segment_type: List[str] = [] if self.__segment_type is not None else None # type: ignore 423 424 # Vectorize chrom compare by making LAI chromosome strings 425 chr_lai = np.asarray(chromosomes).astype(str) 426 427 # Helper to compute cM length for a trimmed interval using LAI windows 428 def _approx_cm_len(chr_mask: np.ndarray, start_bp: int, end_bp: int) -> Optional[float]: 429 if centimorgan_pos is None: 430 return None 431 win_st = physical_pos[chr_mask, 0] 432 win_en = physical_pos[chr_mask, 1] 433 win_cm_st = centimorgan_pos[chr_mask, 0] 434 win_cm_en = centimorgan_pos[chr_mask, 1] 435 cm_total = 0.0 436 for ws, we, cs, ce in zip(win_st, win_en, win_cm_st, win_cm_en): 437 # Overlap with [start_bp, end_bp] 438 overlap_start = max(int(ws), int(start_bp)) 439 overlap_end = min(int(we), int(end_bp)) 440 if overlap_start > overlap_end: 441 continue 442 wlen_bp = max(1, int(we) - int(ws) + 1) 443 olen_bp = int(overlap_end) - int(overlap_start) + 1 444 frac = float(olen_bp) / float(wlen_bp) 445 cm_total += frac * float(ce - cs) 446 return cm_total 447 448 # Iterate over segments 449 for i in range(self.n_segments): 450 chrom = str(self.__chrom[i]) 451 seg_start = int(self.__start[i]) 452 seg_end = int(self.__end[i]) 453 if seg_end < seg_start: 454 continue 455 456 # Subset LAI windows on this chromosome that overlap the segment 457 idx_chr = (chr_lai == chrom) 458 if not np.any(idx_chr): 459 continue 460 lai_st = physical_pos[idx_chr, 0] 461 lai_en = physical_pos[idx_chr, 1] 462 overlaps = (lai_en >= seg_start) & (lai_st <= seg_end) 463 if not np.any(overlaps): 464 continue 465 466 # Build per-window ancestry mask for both individuals 467 s1 = str(self.__sample_id_1[i]) 468 s2 = str(self.__sample_id_2[i]) 469 h1 = int(self.__haplotype_id_1[i]) if self.__haplotype_id_1 is not None else -1 470 h2 = int(self.__haplotype_id_2[i]) if self.__haplotype_id_2 is not None else -1 471 472 # Resolve haplotype column indices for each sample 473 # Known haplotypes are 1-based in inputs; convert to {0,1} 474 def _get_cols(sample: str) -> Tuple[int, int]: 475 a = hap_to_col.get(f"{sample}.0") 476 b = hap_to_col.get(f"{sample}.1") 477 if a is None or b is None: 478 raise ValueError(f"Sample '{sample}' not found in LAI haplotypes.") 479 return a, b 480 481 s1_a, s1_b = _get_cols(s1) 482 s2_a, s2_b = _get_cols(s2) 483 484 # LAI rows for this chromosome 485 lai_rows = lai_str[idx_chr, :] 486 487 # Determine ancestry presence per window for each individual 488 if h1 in (1, 2) and h2 in (1, 2): 489 # Use specific haplotypes 490 s1_col = s1_a if (h1 - 1) == 0 else s1_b 491 s2_col = s2_a if (h2 - 1) == 0 else s2_b 492 s1_mask = (lai_rows[:, s1_col] == anc_str) 493 s2_mask = (lai_rows[:, s2_col] == anc_str) 494 if require_both_haplotypes: 495 # Additionally require the other hap of each sample to match 496 s1_other = s1_b if s1_col == s1_a else s1_a 497 s2_other = s2_b if s2_col == s2_a else s2_a 498 s1_mask = s1_mask & (lai_rows[:, s1_other] == anc_str) 499 s2_mask = s2_mask & (lai_rows[:, s2_other] == anc_str) 500 else: 501 # Unknown hap IDs: require at least one hap to match (or both if requested) 502 if require_both_haplotypes: 503 s1_mask = (lai_rows[:, s1_a] == anc_str) & (lai_rows[:, s1_b] == anc_str) 504 s2_mask = (lai_rows[:, s2_a] == anc_str) & (lai_rows[:, s2_b] == anc_str) 505 else: 506 s1_mask = (lai_rows[:, s1_a] == anc_str) | (lai_rows[:, s1_b] == anc_str) 507 s2_mask = (lai_rows[:, s2_a] == anc_str) | (lai_rows[:, s2_b] == anc_str) 508 509 keep = overlaps & s1_mask & s2_mask 510 511 if method == 'strict': 512 # In strict mode, ALL overlapping windows must have target ancestry 513 if not np.array_equal(overlaps, keep): 514 continue # Drop entire segment 515 516 # Apply length filters to original segment 517 if min_bp is not None and (seg_end - seg_start + 1) < int(min_bp): 518 continue 519 520 # In strict mode, preserve original length_cm 521 cm_len = float(self.__length_cm[i]) if self.__length_cm is not None else None 522 523 if min_cm is not None: 524 if cm_len is None or cm_len < float(min_cm): 525 continue 526 527 # Keep entire original segment 528 out_sample_id_1.append(s1) 529 out_sample_id_2.append(s2) 530 out_haplotype_id_1.append(h1) 531 out_haplotype_id_2.append(h2) 532 out_chrom.append(chrom) 533 out_start.append(seg_start) 534 out_end.append(seg_end) 535 out_length_cm.append(float(cm_len) if cm_len is not None else float('nan')) 536 if out_segment_type is not None: 537 out_segment_type.append(str(self.__segment_type[i])) # type: ignore 538 539 else: # method == 'clip' 540 if not np.any(keep): 541 continue 542 543 # Identify contiguous windows where keep=True 544 idx_keep = np.where(keep)[0] 545 # Split into runs of consecutive indices 546 breaks = np.where(np.diff(idx_keep) > 1)[0] 547 run_starts = np.r_[0, breaks + 1] 548 run_ends = np.r_[breaks, idx_keep.size - 1] 549 550 # Create subsegments for each contiguous run 551 for rs, re in zip(run_starts, run_ends): 552 i0 = idx_keep[rs] 553 i1 = idx_keep[re] 554 sub_start = int(max(seg_start, int(lai_st[i0]))) 555 sub_end = int(min(seg_end, int(lai_en[i1]))) 556 if sub_end < sub_start: 557 continue 558 559 # Length filters: bp first 560 if min_bp is not None and (sub_end - sub_start + 1) < int(min_bp): 561 continue 562 563 # Compute cM length if possible, else approximate or None 564 cm_len = _approx_cm_len(idx_chr, sub_start, sub_end) 565 if cm_len is None and self.__length_cm is not None: 566 # Scale the original segment length by bp fraction 567 total_bp = max(1, int(seg_end - seg_start + 1)) 568 frac_bp = float(sub_end - sub_start + 1) / float(total_bp) 569 try: 570 cm_len = float(self.__length_cm[i]) * frac_bp 571 except Exception: 572 cm_len = None 573 574 # Apply cM filter if requested (treat None as 0) 575 if min_cm is not None: 576 if cm_len is None or cm_len < float(min_cm): 577 continue 578 579 # Append trimmed segment 580 out_sample_id_1.append(s1) 581 out_sample_id_2.append(s2) 582 out_haplotype_id_1.append(h1) 583 out_haplotype_id_2.append(h2) 584 out_chrom.append(chrom) 585 out_start.append(sub_start) 586 out_end.append(sub_end) 587 out_length_cm.append(float(cm_len) if cm_len is not None else float('nan')) 588 if out_segment_type is not None: 589 out_segment_type.append(str(self.__segment_type[i])) # type: ignore 590 591 # If nothing remains, return empty object with zero segments 592 if len(out_start) == 0: 593 # Build minimal arrays 594 empty = IBDObject( 595 sample_id_1=np.array([], dtype=object), 596 haplotype_id_1=np.array([], dtype=int), 597 sample_id_2=np.array([], dtype=object), 598 haplotype_id_2=np.array([], dtype=int), 599 chrom=np.array([], dtype=object), 600 start=np.array([], dtype=int), 601 end=np.array([], dtype=int), 602 length_cm=None, 603 segment_type=None if out_segment_type is None else np.array([], dtype=object), 604 ) 605 if inplace: 606 self.__sample_id_1 = empty.sample_id_1 607 self.__haplotype_id_1 = empty.haplotype_id_1 608 self.__sample_id_2 = empty.sample_id_2 609 self.__haplotype_id_2 = empty.haplotype_id_2 610 self.__chrom = empty.chrom 611 self.__start = empty.start 612 self.__end = empty.end 613 self.__length_cm = empty.length_cm 614 self.__segment_type = empty.segment_type 615 return None 616 return empty 617 618 # Assemble outputs 619 out_length_array: Optional[np.ndarray] 620 if len(out_length_cm) > 0: 621 # Convert NaNs to None-equivalent by using np.array with dtype float 622 out_length_array = np.asarray(out_length_cm, dtype=float) 623 else: 624 out_length_array = None 625 626 new_obj = IBDObject( 627 sample_id_1=np.asarray(out_sample_id_1, dtype=object), 628 haplotype_id_1=np.asarray(out_haplotype_id_1, dtype=int), 629 sample_id_2=np.asarray(out_sample_id_2, dtype=object), 630 haplotype_id_2=np.asarray(out_haplotype_id_2, dtype=int), 631 chrom=np.asarray(out_chrom, dtype=object), 632 start=np.asarray(out_start, dtype=int), 633 end=np.asarray(out_end, dtype=int), 634 length_cm=out_length_array, 635 segment_type=None if out_segment_type is None else np.asarray(out_segment_type, dtype=object), 636 ) 637 638 if inplace: 639 self.__sample_id_1 = new_obj.sample_id_1 640 self.__haplotype_id_1 = new_obj.haplotype_id_1 641 self.__sample_id_2 = new_obj.sample_id_2 642 self.__haplotype_id_2 = new_obj.haplotype_id_2 643 self.__chrom = new_obj.chrom 644 self.__start = new_obj.start 645 self.__end = new_obj.end 646 self.__length_cm = new_obj.length_cm 647 self.__segment_type = new_obj.segment_type 648 return None 649 return new_obj
Filter and/or trim IBD segments to intervals where both individuals carry the specified ancestry
according to a LocalAncestryObject.
This performs an interval intersection per segment against ancestry tracts:
- If haplotype IDs are known (e.g., Hap-IBD), ancestry is checked on the specific haplotype of each individual.
- If haplotype IDs are unknown (e.g., ancIBD; haplotype_id_* == -1), ancestry is
considered present for an individual if at least one of their haplotypes matches
the requested ancestry (unless
require_both_haplotypes=True).
Method 'strict': Drop entire IBD segments if ANY overlapping LAI window contains non-target ancestry for either individual. No trimming occurs - segments are kept whole or dropped completely.
Method 'clip': Trim IBD segments to contiguous regions where both individuals have the target ancestry. Resulting subsegments are clipped to LAI window boundaries and original IBD start/end, with optional length filtering by bp or cM.
Arguments:
- laiobj: LocalAncestryObject containing 2D
laiof shape (n_windows, n_haplotypes),physical_pos(n_windows, 2), andchromosomes(n_windows,). - ancestry: Target ancestry code or label. Compared as string, so both int and str work.
- require_both_haplotypes: If True, require both haplotypes of each individual to have the target ancestry within a window. When haplotypes are known per segment, this only affects cases with unknown haplotypes (== -1) or IBD2 segments.
- min_bp: Minimum base-pair length to retain a segment (strict) or subsegment (clip).
- min_cm: Minimum centiMorgan length to retain a segment (strict) or subsegment (clip).
- inplace: If True, replace
selfwith the restricted object; else return a new object. - method: Method to use for filtering. 'strict' drops entire segments that overlap with non-target ancestry. 'clip' trims segments to target ancestry regions.
Returns:
Optional[IBDObject]: A restricted IBDObject if
inplace=False. Ifinplace=True, returns None.
8def read_ibd(file: Union[str, Path], **kwargs) -> IBDObject: 9 """ 10 Automatically detect the IBD data file format from the file's extension and read it into an `IBDObject`. 11 12 Supported formats: 13 - Hap-IBD (no standard extension; defaults to tab-delimited columns without header). 14 - ancIBD (template only). 15 16 Args: 17 file (str or pathlib.Path): Path to the file to be read. 18 **kwargs: Additional arguments passed to the reader method. 19 """ 20 from snputils.ibd.io.read.auto import IBDReader 21 22 return IBDReader(file).read(**kwargs)
Automatically detect the IBD data file format from the file's extension and read it into an IBDObject.
Supported formats:
- Hap-IBD (no standard extension; defaults to tab-delimited columns without header).
- ancIBD (template only).
Arguments:
- file (str or pathlib.Path): Path to the file to be read.
- **kwargs: Additional arguments passed to the reader method.
18class HapIBDReader(IBDBaseReader): 19 """ 20 Reads an IBD file in Hap-IBD format and processes it into an `IBDObject`. 21 """ 22 23 def read(self, separator: Optional[str] = None) -> IBDObject: 24 """ 25 Read a Hap-IBD file into an `IBDObject`. 26 27 The Hap-IBD format is a delimited text without a header with columns: 28 sample_id_1, haplotype_id_1, sample_id_2, haplotype_id_2, chromosome, start, end, length_cm 29 30 Notes: 31 - Haplotype identifiers are 1-based and take values in {1, 2}. 32 33 Args: 34 separator (str, optional): Field delimiter. If None, whitespace (any number of spaces or tabs) is assumed. 35 36 Returns: 37 **IBDObject**: An IBDObject instance. 38 """ 39 log.info(f"Reading {self.file}") 40 41 # Column names for Hap-IBD files (no header present in input) 42 col_names = [ 43 'sample_id_1', 'haplotype_id_1', 'sample_id_2', 'haplotype_id_2', 44 'chrom', 'start', 'end', 'length_cm' 45 ] 46 47 # Detect gzip by extension 48 is_gz = str(self.file).endswith('.gz') 49 50 # If separator is None, treat as whitespace-delimited (any spaces or tabs) 51 if separator is None: 52 # Polars doesn't support regex separators; normalize whitespace to single tabs before parsing 53 if is_gz: 54 with gzip.open(self.file, 'rt') as f: 55 lines = [re.sub(r"\s+", "\t", line.strip()) for line in f if line.strip()] 56 else: 57 with open(self.file, 'r') as f: 58 lines = [re.sub(r"\s+", "\t", line.strip()) for line in f if line.strip()] 59 60 data = StringIO("\n".join(lines)) 61 df = pl.read_csv( 62 source=data, 63 has_header=False, 64 separator='\t', 65 new_columns=col_names, 66 schema_overrides={ 67 'sample_id_1': pl.Utf8, 68 'haplotype_id_1': pl.Int8, 69 'sample_id_2': pl.Utf8, 70 'haplotype_id_2': pl.Int8, 71 'chrom': pl.Utf8, 72 'start': pl.Int64, 73 'end': pl.Int64, 74 'length_cm': pl.Float64, 75 }, 76 ) 77 else: 78 df = pl.read_csv( 79 source=str(self.file), 80 has_header=False, 81 separator=separator, 82 new_columns=col_names, 83 schema_overrides={ 84 'sample_id_1': pl.Utf8, 85 'haplotype_id_1': pl.Int8, 86 'sample_id_2': pl.Utf8, 87 'haplotype_id_2': pl.Int8, 88 'chrom': pl.Utf8, 89 'start': pl.Int64, 90 'end': pl.Int64, 91 'length_cm': pl.Float64, 92 }, 93 ) 94 95 ibdobj = IBDObject( 96 sample_id_1=df['sample_id_1'].to_numpy(), 97 haplotype_id_1=df['haplotype_id_1'].to_numpy(), 98 sample_id_2=df['sample_id_2'].to_numpy(), 99 haplotype_id_2=df['haplotype_id_2'].to_numpy(), 100 chrom=df['chrom'].to_numpy(), 101 start=df['start'].to_numpy(), 102 end=df['end'].to_numpy(), 103 length_cm=df['length_cm'].to_numpy(), 104 segment_type=np.array(["IBD1"] * df.height), # hap-IBD does not distinguish; treat as IBD1 105 ) 106 107 log.info(f"Finished reading {self.file}") 108 109 return ibdobj
Reads an IBD file in Hap-IBD format and processes it into an IBDObject.
23 def read(self, separator: Optional[str] = None) -> IBDObject: 24 """ 25 Read a Hap-IBD file into an `IBDObject`. 26 27 The Hap-IBD format is a delimited text without a header with columns: 28 sample_id_1, haplotype_id_1, sample_id_2, haplotype_id_2, chromosome, start, end, length_cm 29 30 Notes: 31 - Haplotype identifiers are 1-based and take values in {1, 2}. 32 33 Args: 34 separator (str, optional): Field delimiter. If None, whitespace (any number of spaces or tabs) is assumed. 35 36 Returns: 37 **IBDObject**: An IBDObject instance. 38 """ 39 log.info(f"Reading {self.file}") 40 41 # Column names for Hap-IBD files (no header present in input) 42 col_names = [ 43 'sample_id_1', 'haplotype_id_1', 'sample_id_2', 'haplotype_id_2', 44 'chrom', 'start', 'end', 'length_cm' 45 ] 46 47 # Detect gzip by extension 48 is_gz = str(self.file).endswith('.gz') 49 50 # If separator is None, treat as whitespace-delimited (any spaces or tabs) 51 if separator is None: 52 # Polars doesn't support regex separators; normalize whitespace to single tabs before parsing 53 if is_gz: 54 with gzip.open(self.file, 'rt') as f: 55 lines = [re.sub(r"\s+", "\t", line.strip()) for line in f if line.strip()] 56 else: 57 with open(self.file, 'r') as f: 58 lines = [re.sub(r"\s+", "\t", line.strip()) for line in f if line.strip()] 59 60 data = StringIO("\n".join(lines)) 61 df = pl.read_csv( 62 source=data, 63 has_header=False, 64 separator='\t', 65 new_columns=col_names, 66 schema_overrides={ 67 'sample_id_1': pl.Utf8, 68 'haplotype_id_1': pl.Int8, 69 'sample_id_2': pl.Utf8, 70 'haplotype_id_2': pl.Int8, 71 'chrom': pl.Utf8, 72 'start': pl.Int64, 73 'end': pl.Int64, 74 'length_cm': pl.Float64, 75 }, 76 ) 77 else: 78 df = pl.read_csv( 79 source=str(self.file), 80 has_header=False, 81 separator=separator, 82 new_columns=col_names, 83 schema_overrides={ 84 'sample_id_1': pl.Utf8, 85 'haplotype_id_1': pl.Int8, 86 'sample_id_2': pl.Utf8, 87 'haplotype_id_2': pl.Int8, 88 'chrom': pl.Utf8, 89 'start': pl.Int64, 90 'end': pl.Int64, 91 'length_cm': pl.Float64, 92 }, 93 ) 94 95 ibdobj = IBDObject( 96 sample_id_1=df['sample_id_1'].to_numpy(), 97 haplotype_id_1=df['haplotype_id_1'].to_numpy(), 98 sample_id_2=df['sample_id_2'].to_numpy(), 99 haplotype_id_2=df['haplotype_id_2'].to_numpy(), 100 chrom=df['chrom'].to_numpy(), 101 start=df['start'].to_numpy(), 102 end=df['end'].to_numpy(), 103 length_cm=df['length_cm'].to_numpy(), 104 segment_type=np.array(["IBD1"] * df.height), # hap-IBD does not distinguish; treat as IBD1 105 ) 106 107 log.info(f"Finished reading {self.file}") 108 109 return ibdobj
Read a Hap-IBD file into an IBDObject.
The Hap-IBD format is a delimited text without a header with columns: sample_id_1, haplotype_id_1, sample_id_2, haplotype_id_2, chromosome, start, end, length_cm
Notes:
- Haplotype identifiers are 1-based and take values in {1, 2}.
Arguments:
- separator (str, optional): Field delimiter. If None, whitespace (any number of spaces or tabs) is assumed.
Returns:
IBDObject: An IBDObject instance.
17class AncIBDReader(IBDBaseReader): 18 """ 19 Reads IBD data from ancIBD outputs (TSV), accepting a file (`ch_all.tsv` or `ch*.tsv`) or a directory. 20 """ 21 22 def read( 23 self, 24 path: Optional[Union[str, Path]] = None, 25 include_segment_types: Optional[Sequence[str]] = ("IBD1", "IBD2"), 26 ) -> IBDObject: 27 """ 28 Read ancIBD outputs and convert to `IBDObject`. 29 30 Inputs accepted: 31 - A single TSV (optionally gzipped), e.g. `ch_all.tsv[.gz]` or `ch{CHR}.tsv[.gz]`. 32 - A directory containing per-chromosome TSVs or `ch_all.tsv`. 33 34 Column schema (tab-separated with header): 35 iid1, iid2, ch, Start, End, length, StartM, EndM, lengthM, StartBP, EndBP, segment_type 36 37 Notes: 38 - Haplotype indices are not provided by ancIBD; set to -1. 39 - Positions in IBDObject use base-pair StartBP/EndBP. 40 - Length uses centiMorgan as `lengthM * 100`. 41 42 Args: 43 path (str or Path, optional): Override input path. Defaults to `self.file`. 44 include_segment_types (sequence of str, optional): Filter by `segment_type` (e.g., IBD1, IBD2). None to disable. 45 46 Returns: 47 **IBDObject**: An IBDObject instance. 48 """ 49 p = Path(path) if path is not None else Path(self.file) 50 log.info(f"Reading ancIBD from {p}") 51 52 files: list[Path] 53 if p.is_dir(): 54 # Prefer combined file if present, else gather per-chromosome files 55 combined = p / "ch_all.tsv" 56 combined_gz = p / "ch_all.tsv.gz" 57 if combined.exists(): 58 files = [combined] 59 elif combined_gz.exists(): 60 files = [combined_gz] 61 else: 62 files = sorted(list(p.glob("ch*.tsv")) + list(p.glob("ch*.tsv.gz"))) 63 if not files: 64 raise FileNotFoundError("No ancIBD output files found in directory.") 65 else: 66 files = [p] 67 68 frames = [] 69 schema_overrides = { 70 "iid1": pl.Utf8, 71 "iid2": pl.Utf8, 72 "ch": pl.Utf8, 73 "Start": pl.Int64, 74 "End": pl.Int64, 75 "length": pl.Int64, # marker span; not used 76 "StartM": pl.Float64, 77 "EndM": pl.Float64, 78 "lengthM": pl.Float64, 79 "StartBP": pl.Int64, 80 "EndBP": pl.Int64, 81 "segment_type": pl.Utf8, 82 } 83 84 for f in files: 85 frame = pl.read_csv(str(f), separator="\t", has_header=True, schema_overrides=schema_overrides) 86 frames.append(frame) 87 88 df = pl.concat(frames, how="vertical") if len(frames) > 1 else frames[0] 89 90 if include_segment_types is not None: 91 df = df.filter(pl.col("segment_type").is_in(list(include_segment_types))) 92 93 # Map columns to IBDObject schema 94 sample_id_1 = df["iid1"].to_numpy() 95 sample_id_2 = df["iid2"].to_numpy() 96 chrom = df["ch"].to_numpy() 97 start_bp = df["StartBP"].to_numpy() 98 end_bp = df["EndBP"].to_numpy() 99 length_cm = (df["lengthM"] * 100.0).to_numpy() 100 101 # ancIBD doesn't include haplotype indices; set to -1 102 hap1 = np.full(sample_id_1.shape[0], -1, dtype=np.int8) 103 hap2 = np.full(sample_id_2.shape[0], -1, dtype=np.int8) 104 105 ibdobj = IBDObject( 106 sample_id_1=sample_id_1, 107 haplotype_id_1=hap1, 108 sample_id_2=sample_id_2, 109 haplotype_id_2=hap2, 110 chrom=chrom, 111 start=start_bp, 112 end=end_bp, 113 length_cm=length_cm, 114 segment_type=df["segment_type"].to_numpy(), 115 ) 116 117 log.info(f"Finished reading ancIBD from {p}") 118 return ibdobj
Reads IBD data from ancIBD outputs (TSV), accepting a file (ch_all.tsv or ch*.tsv) or a directory.
22 def read( 23 self, 24 path: Optional[Union[str, Path]] = None, 25 include_segment_types: Optional[Sequence[str]] = ("IBD1", "IBD2"), 26 ) -> IBDObject: 27 """ 28 Read ancIBD outputs and convert to `IBDObject`. 29 30 Inputs accepted: 31 - A single TSV (optionally gzipped), e.g. `ch_all.tsv[.gz]` or `ch{CHR}.tsv[.gz]`. 32 - A directory containing per-chromosome TSVs or `ch_all.tsv`. 33 34 Column schema (tab-separated with header): 35 iid1, iid2, ch, Start, End, length, StartM, EndM, lengthM, StartBP, EndBP, segment_type 36 37 Notes: 38 - Haplotype indices are not provided by ancIBD; set to -1. 39 - Positions in IBDObject use base-pair StartBP/EndBP. 40 - Length uses centiMorgan as `lengthM * 100`. 41 42 Args: 43 path (str or Path, optional): Override input path. Defaults to `self.file`. 44 include_segment_types (sequence of str, optional): Filter by `segment_type` (e.g., IBD1, IBD2). None to disable. 45 46 Returns: 47 **IBDObject**: An IBDObject instance. 48 """ 49 p = Path(path) if path is not None else Path(self.file) 50 log.info(f"Reading ancIBD from {p}") 51 52 files: list[Path] 53 if p.is_dir(): 54 # Prefer combined file if present, else gather per-chromosome files 55 combined = p / "ch_all.tsv" 56 combined_gz = p / "ch_all.tsv.gz" 57 if combined.exists(): 58 files = [combined] 59 elif combined_gz.exists(): 60 files = [combined_gz] 61 else: 62 files = sorted(list(p.glob("ch*.tsv")) + list(p.glob("ch*.tsv.gz"))) 63 if not files: 64 raise FileNotFoundError("No ancIBD output files found in directory.") 65 else: 66 files = [p] 67 68 frames = [] 69 schema_overrides = { 70 "iid1": pl.Utf8, 71 "iid2": pl.Utf8, 72 "ch": pl.Utf8, 73 "Start": pl.Int64, 74 "End": pl.Int64, 75 "length": pl.Int64, # marker span; not used 76 "StartM": pl.Float64, 77 "EndM": pl.Float64, 78 "lengthM": pl.Float64, 79 "StartBP": pl.Int64, 80 "EndBP": pl.Int64, 81 "segment_type": pl.Utf8, 82 } 83 84 for f in files: 85 frame = pl.read_csv(str(f), separator="\t", has_header=True, schema_overrides=schema_overrides) 86 frames.append(frame) 87 88 df = pl.concat(frames, how="vertical") if len(frames) > 1 else frames[0] 89 90 if include_segment_types is not None: 91 df = df.filter(pl.col("segment_type").is_in(list(include_segment_types))) 92 93 # Map columns to IBDObject schema 94 sample_id_1 = df["iid1"].to_numpy() 95 sample_id_2 = df["iid2"].to_numpy() 96 chrom = df["ch"].to_numpy() 97 start_bp = df["StartBP"].to_numpy() 98 end_bp = df["EndBP"].to_numpy() 99 length_cm = (df["lengthM"] * 100.0).to_numpy() 100 101 # ancIBD doesn't include haplotype indices; set to -1 102 hap1 = np.full(sample_id_1.shape[0], -1, dtype=np.int8) 103 hap2 = np.full(sample_id_2.shape[0], -1, dtype=np.int8) 104 105 ibdobj = IBDObject( 106 sample_id_1=sample_id_1, 107 haplotype_id_1=hap1, 108 sample_id_2=sample_id_2, 109 haplotype_id_2=hap2, 110 chrom=chrom, 111 start=start_bp, 112 end=end_bp, 113 length_cm=length_cm, 114 segment_type=df["segment_type"].to_numpy(), 115 ) 116 117 log.info(f"Finished reading ancIBD from {p}") 118 return ibdobj
Read ancIBD outputs and convert to IBDObject.
Inputs accepted:
- A single TSV (optionally gzipped), e.g.
ch_all.tsv[.gz]orch{CHR}.tsv[.gz]. - A directory containing per-chromosome TSVs or
ch_all.tsv.
Column schema (tab-separated with header): iid1, iid2, ch, Start, End, length, StartM, EndM, lengthM, StartBP, EndBP, segment_type
Notes:
- Haplotype indices are not provided by ancIBD; set to -1.
- Positions in IBDObject use base-pair StartBP/EndBP.
- Length uses centiMorgan as
lengthM * 100.
Arguments:
- path (str or Path, optional): Override input path. Defaults to
self.file. - include_segment_types (sequence of str, optional): Filter by
segment_type(e.g., IBD1, IBD2). None to disable.
Returns:
IBDObject: An IBDObject instance.
8class IBDReader: 9 def __new__( 10 cls, 11 file: Union[str, Path] 12 ) -> object: 13 """ 14 A factory class that attempts to detect the IBD file format and returns the corresponding reader. 15 16 Supported detections: 17 - Hap-IBD: *.ibd or *.ibd.gz (headerless, 8 columns) 18 - ancIBD: directories with `ch_all.tsv`/`ch*.tsv` or files *.tsv / *.tsv.gz with ancIBD schema 19 """ 20 file = Path(file) 21 suffixes = [s.lower() for s in file.suffixes] 22 23 # Directory-based detection for ancIBD 24 if file.is_dir(): 25 if (file / 'ch_all.tsv').exists() or (file / 'ch_all.tsv.gz').exists(): 26 from snputils.ibd.io.read.anc_ibd import AncIBDReader 27 return AncIBDReader(file) 28 has_chr_files = list(file.glob('ch*.tsv')) or list(file.glob('ch*.tsv.gz')) 29 if has_chr_files: 30 from snputils.ibd.io.read.anc_ibd import AncIBDReader 31 return AncIBDReader(file) 32 # Fallback to HapIBD if nothing matches 33 from snputils.ibd.io.read.hap_ibd import HapIBDReader 34 return HapIBDReader(file) 35 36 # File-based detection 37 if suffixes[-2:] == ['.ibd', '.gz'] or suffixes[-1:] == ['.ibd']: 38 from snputils.ibd.io.read.hap_ibd import HapIBDReader 39 return HapIBDReader(file) 40 if suffixes[-2:] == ['.tsv', '.gz'] or suffixes[-1:] == ['.tsv']: 41 from snputils.ibd.io.read.anc_ibd import AncIBDReader 42 return AncIBDReader(file) 43 44 # Default to HapIBDReader (most tools use .ibd[.gz]) 45 from snputils.ibd.io.read.hap_ibd import HapIBDReader 46 return HapIBDReader(file)
9 def __new__( 10 cls, 11 file: Union[str, Path] 12 ) -> object: 13 """ 14 A factory class that attempts to detect the IBD file format and returns the corresponding reader. 15 16 Supported detections: 17 - Hap-IBD: *.ibd or *.ibd.gz (headerless, 8 columns) 18 - ancIBD: directories with `ch_all.tsv`/`ch*.tsv` or files *.tsv / *.tsv.gz with ancIBD schema 19 """ 20 file = Path(file) 21 suffixes = [s.lower() for s in file.suffixes] 22 23 # Directory-based detection for ancIBD 24 if file.is_dir(): 25 if (file / 'ch_all.tsv').exists() or (file / 'ch_all.tsv.gz').exists(): 26 from snputils.ibd.io.read.anc_ibd import AncIBDReader 27 return AncIBDReader(file) 28 has_chr_files = list(file.glob('ch*.tsv')) or list(file.glob('ch*.tsv.gz')) 29 if has_chr_files: 30 from snputils.ibd.io.read.anc_ibd import AncIBDReader 31 return AncIBDReader(file) 32 # Fallback to HapIBD if nothing matches 33 from snputils.ibd.io.read.hap_ibd import HapIBDReader 34 return HapIBDReader(file) 35 36 # File-based detection 37 if suffixes[-2:] == ['.ibd', '.gz'] or suffixes[-1:] == ['.ibd']: 38 from snputils.ibd.io.read.hap_ibd import HapIBDReader 39 return HapIBDReader(file) 40 if suffixes[-2:] == ['.tsv', '.gz'] or suffixes[-1:] == ['.tsv']: 41 from snputils.ibd.io.read.anc_ibd import AncIBDReader 42 return AncIBDReader(file) 43 44 # Default to HapIBDReader (most tools use .ibd[.gz]) 45 from snputils.ibd.io.read.hap_ibd import HapIBDReader 46 return HapIBDReader(file)
A factory class that attempts to detect the IBD file format and returns the corresponding reader.
Supported detections:
- Hap-IBD: *.ibd or *.ibd.gz (headerless, 8 columns)
- ancIBD: directories with
ch_all.tsv/ch*.tsvor files *.tsv / *.tsv.gz with ancIBD schema