snputils.ibd

1from .genobj.ibdobj import IBDObject
2from .io import read_ibd, HapIBDReader, AncIBDReader, IBDReader
3
4__all__ = ['IBDObject', 'read_ibd', 'HapIBDReader', 'AncIBDReader', 'IBDReader']
class IBDObject:
 12class IBDObject:
 13    """
 14    A class for Identity-By-Descent (IBD) segment data.
 15    """
 16
 17    def __init__(
 18        self,
 19        sample_id_1: np.ndarray,
 20        haplotype_id_1: np.ndarray,
 21        sample_id_2: np.ndarray,
 22        haplotype_id_2: np.ndarray,
 23        chrom: np.ndarray,
 24        start: np.ndarray,
 25        end: np.ndarray,
 26        length_cm: Optional[np.ndarray] = None,
 27        segment_type: Optional[np.ndarray] = None,
 28    ) -> None:
 29        """
 30        Args:
 31            sample_id_1 (array of shape (n_segments,)): Sample identifiers for the first individual.
 32            haplotype_id_1 (array of shape (n_segments,)): Haplotype identifiers for the first individual (values in {1, 2}, or -1 if unknown).
 33            sample_id_2 (array of shape (n_segments,)): Sample identifiers for the second individual.
 34            haplotype_id_2 (array of shape (n_segments,)): Haplotype identifiers for the second individual (values in {1, 2}, or -1 if unknown).
 35            chrom (array of shape (n_segments,)): Chromosome identifier for each IBD segment.
 36            start (array of shape (n_segments,)): Start physical position (1-based, bp) for each IBD segment.
 37            end (array of shape (n_segments,)): End physical position (1-based, bp) for each IBD segment.
 38            length_cm (array of shape (n_segments,), optional): Genetic length (cM) for each segment, if available.
 39        """
 40        # Store attributes
 41        self.__sample_id_1 = np.asarray(sample_id_1)
 42        self.__haplotype_id_1 = np.asarray(haplotype_id_1)
 43        self.__sample_id_2 = np.asarray(sample_id_2)
 44        self.__haplotype_id_2 = np.asarray(haplotype_id_2)
 45        self.__chrom = np.asarray(chrom)
 46        self.__start = np.asarray(start)
 47        self.__end = np.asarray(end)
 48        self.__length_cm = None if length_cm is None else np.asarray(length_cm)
 49        self.__segment_type = None if segment_type is None else np.asarray(segment_type)
 50
 51        self._sanity_check()
 52
 53    def __getitem__(self, key: str) -> Any:
 54        """
 55        To access an attribute of the class using the square bracket notation,
 56        similar to a dictionary.
 57        """
 58        try:
 59            return getattr(self, key)
 60        except Exception:
 61            raise KeyError(f"Invalid key: {key}.")
 62
 63    def __setitem__(self, key: str, value: Any) -> None:
 64        """
 65        To set an attribute of the class using the square bracket notation,
 66        similar to a dictionary.
 67        """
 68        try:
 69            setattr(self, key, value)
 70        except Exception:
 71            raise KeyError(f"Invalid key: {key}.")
 72
 73    @property
 74    def sample_id_1(self) -> np.ndarray:
 75        """
 76        Retrieve `sample_id_1`.
 77
 78        Returns:
 79            **array of shape (n_segments,):** Sample identifiers for the first individual.
 80        """
 81        return self.__sample_id_1
 82
 83    @sample_id_1.setter
 84    def sample_id_1(self, x: Sequence) -> None:
 85        """
 86        Update `sample_id_1`.
 87        """
 88        self.__sample_id_1 = np.asarray(x)
 89
 90    @property
 91    def haplotype_id_1(self) -> np.ndarray:
 92        """
 93        Retrieve `haplotype_id_1`.
 94
 95        Returns:
 96            **array of shape (n_segments,):** Haplotype identifiers for the first individual (values in {1, 2}).
 97        """
 98        return self.__haplotype_id_1
 99
100    @haplotype_id_1.setter
101    def haplotype_id_1(self, x: Sequence) -> None:
102        """
103        Update `haplotype_id_1`.
104        """
105        self.__haplotype_id_1 = np.asarray(x)
106
107    @property
108    def sample_id_2(self) -> np.ndarray:
109        """
110        Retrieve `sample_id_2`.
111
112        Returns:
113            **array of shape (n_segments,):** Sample identifiers for the second individual.
114        """
115        return self.__sample_id_2
116
117    @sample_id_2.setter
118    def sample_id_2(self, x: Sequence) -> None:
119        """
120        Update `sample_id_2`.
121        """
122        self.__sample_id_2 = np.asarray(x)
123
124    @property
125    def haplotype_id_2(self) -> np.ndarray:
126        """
127        Retrieve `haplotype_id_2`.
128
129        Returns:
130            **array of shape (n_segments,):** Haplotype identifiers for the second individual (values in {1, 2}).
131        """
132        return self.__haplotype_id_2
133
134    @haplotype_id_2.setter
135    def haplotype_id_2(self, x: Sequence) -> None:
136        """
137        Update `haplotype_id_2`.
138        """
139        self.__haplotype_id_2 = np.asarray(x)
140
141    @property
142    def chrom(self) -> np.ndarray:
143        """
144        Retrieve `chrom`.
145
146        Returns:
147            **array of shape (n_segments,):** Chromosome identifier for each IBD segment.
148        """
149        return self.__chrom
150
151    @chrom.setter
152    def chrom(self, x: Sequence) -> None:
153        """
154        Update `chrom`.
155        """
156        self.__chrom = np.asarray(x)
157
158    @property
159    def start(self) -> np.ndarray:
160        """
161        Retrieve `start`.
162
163        Returns:
164            **array of shape (n_segments,):** Start physical position (1-based, bp) for each IBD segment.
165        """
166        return self.__start
167
168    @start.setter
169    def start(self, x: Sequence) -> None:
170        """
171        Update `start`.
172        """
173        self.__start = np.asarray(x)
174
175    @property
176    def end(self) -> np.ndarray:
177        """
178        Retrieve `end`.
179
180        Returns:
181            **array of shape (n_segments,):** End physical position (1-based, bp) for each IBD segment.
182        """
183        return self.__end
184
185    @end.setter
186    def end(self, x: Sequence) -> None:
187        """
188        Update `end`.
189        """
190        self.__end = np.asarray(x)
191
192    @property
193    def length_cm(self) -> Optional[np.ndarray]:
194        """
195        Retrieve `length_cm`.
196
197        Returns:
198            **array of shape (n_segments,):** Genetic length (cM) for each segment if available; otherwise None.
199        """
200        return self.__length_cm
201
202    @length_cm.setter
203    def length_cm(self, x: Optional[Sequence]) -> None:
204        """
205        Update `length_cm`.
206        """
207        self.__length_cm = None if x is None else np.asarray(x)
208
209    @property
210    def segment_type(self) -> Optional[np.ndarray]:
211        """
212        Retrieve `segment_type`.
213
214        Returns:
215            **array of shape (n_segments,):** Segment type labels (e.g., 'IBD1', 'IBD2'), or None if unavailable.
216        """
217        return self.__segment_type
218
219    @segment_type.setter
220    def segment_type(self, x: Optional[Sequence]) -> None:
221        """
222        Update `segment_type`.
223        """
224        self.__segment_type = None if x is None else np.asarray(x)
225
226    @property
227    def n_segments(self) -> int:
228        """
229        Retrieve `n_segments`.
230
231        Returns:
232            **int:** The total number of IBD segments.
233        """
234        return self.__chrom.shape[0]
235
236    @property
237    def pairs(self) -> np.ndarray:
238        """
239        Retrieve `pairs`.
240
241        Returns:
242            **array of shape (n_segments, 2):** Per-segment sample identifier pairs.
243        """
244        return np.column_stack([self.__sample_id_1, self.__sample_id_2])
245
246    @property
247    def haplotype_pairs(self) -> np.ndarray:
248        """
249        Retrieve `haplotype_pairs`.
250
251        Returns:
252            **array of shape (n_segments, 2):** Per-segment haplotype identifier pairs.
253        """
254        return np.column_stack([self.__haplotype_id_1, self.__haplotype_id_2])
255
256    def copy(self) -> 'IBDObject':
257        """
258        Create and return a copy of `self`.
259
260        Returns:
261            **IBDObject:** A new instance of the current object.
262        """
263        return copy.deepcopy(self)
264
265    def keys(self) -> List[str]:
266        """
267        Retrieve a list of public attribute names for `self`.
268
269        Returns:
270            **list of str:** A list of attribute names, with internal name-mangling removed.
271        """
272        return [attr.replace('_IBDObject__', '') for attr in vars(self)]
273
274    def filter_segments(
275        self,
276        chrom: Optional[Sequence[str]] = None,
277        samples: Optional[Sequence[str]] = None,
278        min_length_cm: Optional[float] = None,
279        segment_types: Optional[Sequence[str]] = None,
280        inplace: bool = False,
281    ) -> Optional['IBDObject']:
282        """
283        Filter IBD segments by chromosome, sample names, and/or minimum genetic length.
284
285        Args:
286            chrom (sequence of str, optional): Chromosome(s) to include.
287            samples (sequence of str, optional): Sample names to include if present in either column.
288            min_length_cm (float, optional): Minimum cM length threshold.
289            inplace (bool, default=False): If True, modifies `self` in place. If False, returns a new `IBDObject`.
290
291        Returns:
292            **Optional[IBDObject]:** A filtered IBDObject if `inplace=False`. If `inplace=True`, returns None.
293        """
294        mask = np.ones(self.n_segments, dtype=bool)
295
296        if chrom is not None:
297            chrom = np.atleast_1d(chrom)
298            mask &= np.isin(self.__chrom, chrom)
299
300        if samples is not None:
301            samples = np.atleast_1d(samples)
302            mask &= np.isin(self.__sample_id_1, samples) | np.isin(self.__sample_id_2, samples)
303
304        if min_length_cm is not None and self.__length_cm is not None:
305            mask &= self.__length_cm >= float(min_length_cm)
306
307        if segment_types is not None and self.__segment_type is not None:
308            segment_types = np.atleast_1d(segment_types)
309            mask &= np.isin(self.__segment_type, segment_types)
310
311        def _apply_mask(x: Optional[np.ndarray]) -> Optional[np.ndarray]:
312            return None if x is None else np.asarray(x)[mask]
313
314        if inplace:
315            self.__sample_id_1 = _apply_mask(self.__sample_id_1)
316            self.__haplotype_id_1 = _apply_mask(self.__haplotype_id_1)
317            self.__sample_id_2 = _apply_mask(self.__sample_id_2)
318            self.__haplotype_id_2 = _apply_mask(self.__haplotype_id_2)
319            self.__chrom = _apply_mask(self.__chrom)
320            self.__start = _apply_mask(self.__start)
321            self.__end = _apply_mask(self.__end)
322            self.__length_cm = _apply_mask(self.__length_cm)
323            self.__segment_type = _apply_mask(self.__segment_type)
324            return None
325        else:
326            return IBDObject(
327                sample_id_1=_apply_mask(self.__sample_id_1),
328                haplotype_id_1=_apply_mask(self.__haplotype_id_1),
329                sample_id_2=_apply_mask(self.__sample_id_2),
330                haplotype_id_2=_apply_mask(self.__haplotype_id_2),
331                chrom=_apply_mask(self.__chrom),
332                start=_apply_mask(self.__start),
333                end=_apply_mask(self.__end),
334                length_cm=_apply_mask(self.__length_cm),
335                segment_type=_apply_mask(self.__segment_type),
336            )
337
338    def restrict_to_ancestry(
339        self,
340        *,
341        laiobj: Any,
342        ancestry: Any,
343        require_both_haplotypes: bool = False,
344        min_bp: Optional[int] = None,
345        min_cm: Optional[float] = None,
346        inplace: bool = False,
347        method: str = 'clip',
348    ) -> Optional['IBDObject']:
349        """
350        Filter and/or trim IBD segments to intervals where both individuals carry the specified ancestry
351        according to a `LocalAncestryObject`.
352
353        This performs an interval intersection per segment against ancestry tracts:
354        - If haplotype IDs are known (e.g., Hap-IBD), ancestry is checked on the specific
355          haplotype of each individual.
356        - If haplotype IDs are unknown (e.g., ancIBD; haplotype_id_* == -1), ancestry is
357          considered present for an individual if at least one of their haplotypes matches
358          the requested ancestry (unless `require_both_haplotypes=True`).
359
360        Method 'strict':
361            Drop entire IBD segments if ANY overlapping LAI window contains non-target ancestry
362            for either individual. No trimming occurs - segments are kept whole or dropped completely.
363
364        Method 'clip':
365            Trim IBD segments to contiguous regions where both individuals have the target ancestry.
366            Resulting subsegments are clipped to LAI window boundaries and original IBD start/end,
367            with optional length filtering by bp or cM.
368
369        Args:
370            laiobj: LocalAncestryObject containing 2D `lai` of shape (n_windows, n_haplotypes),
371                `physical_pos` (n_windows, 2), and `chromosomes` (n_windows,).
372            ancestry: Target ancestry code or label. Compared as string, so both int and str work.
373            require_both_haplotypes: If True, require both haplotypes of each individual to have
374                the target ancestry within a window. When haplotypes are known per segment, this
375                only affects cases with unknown haplotypes (== -1) or IBD2 segments.
376            min_bp: Minimum base-pair length to retain a segment (strict) or subsegment (clip).
377            min_cm: Minimum centiMorgan length to retain a segment (strict) or subsegment (clip).
378            inplace: If True, replace `self` with the restricted object; else return a new object.
379            method: Method to use for filtering. 'strict' drops entire segments that overlap with
380                non-target ancestry. 'clip' trims segments to target ancestry regions.
381
382        Returns:
383            Optional[IBDObject]: A restricted IBDObject if `inplace=False`. If `inplace=True`,
384                returns None.
385        """
386        if method not in ['strict', 'clip']:
387            raise ValueError(f"Method must be 'strict' or 'clip', got '{method}'")
388
389        # Basic LAI shape/metadata checks
390        lai = getattr(laiobj, 'lai', None)
391        physical_pos = getattr(laiobj, 'physical_pos', None)
392        chromosomes = getattr(laiobj, 'chromosomes', None)
393        centimorgan_pos = getattr(laiobj, 'centimorgan_pos', None)
394        haplotypes = getattr(laiobj, 'haplotypes', None)
395
396        if lai is None or physical_pos is None or chromosomes is None or haplotypes is None:
397            raise ValueError(
398                "`laiobj` must provide `lai`, `physical_pos`, `chromosomes`, and `haplotypes`."
399            )
400
401        if lai.ndim != 2:
402            raise ValueError("`laiobj.lai` must be 2D with shape (n_windows, n_haplotypes).")
403
404        # Build haplotype label -> column index map (labels like 'Sample.0', 'Sample.1')
405        hap_to_col = {str(h): i for i, h in enumerate(haplotypes)}
406
407        # Coerce ancestry to str for robust comparisons
408        anc_str = str(ancestry)
409
410        # Coerce LAI values to str once for comparisons
411        lai_str = lai.astype(str)
412
413        # Prepare arrays for the restricted segments
414        out_sample_id_1: List[str] = []
415        out_haplotype_id_1: List[int] = []
416        out_sample_id_2: List[str] = []
417        out_haplotype_id_2: List[int] = []
418        out_chrom: List[str] = []
419        out_start: List[int] = []
420        out_end: List[int] = []
421        out_length_cm: List[float] = []
422        out_segment_type: List[str] = [] if self.__segment_type is not None else None  # type: ignore
423
424        # Vectorize chrom compare by making LAI chromosome strings
425        chr_lai = np.asarray(chromosomes).astype(str)
426
427        # Helper to compute cM length for a trimmed interval using LAI windows
428        def _approx_cm_len(chr_mask: np.ndarray, start_bp: int, end_bp: int) -> Optional[float]:
429            if centimorgan_pos is None:
430                return None
431            win_st = physical_pos[chr_mask, 0]
432            win_en = physical_pos[chr_mask, 1]
433            win_cm_st = centimorgan_pos[chr_mask, 0]
434            win_cm_en = centimorgan_pos[chr_mask, 1]
435            cm_total = 0.0
436            for ws, we, cs, ce in zip(win_st, win_en, win_cm_st, win_cm_en):
437                # Overlap with [start_bp, end_bp]
438                overlap_start = max(int(ws), int(start_bp))
439                overlap_end = min(int(we), int(end_bp))
440                if overlap_start > overlap_end:
441                    continue
442                wlen_bp = max(1, int(we) - int(ws) + 1)
443                olen_bp = int(overlap_end) - int(overlap_start) + 1
444                frac = float(olen_bp) / float(wlen_bp)
445                cm_total += frac * float(ce - cs)
446            return cm_total
447
448        # Iterate over segments
449        for i in range(self.n_segments):
450            chrom = str(self.__chrom[i])
451            seg_start = int(self.__start[i])
452            seg_end = int(self.__end[i])
453            if seg_end < seg_start:
454                continue
455
456            # Subset LAI windows on this chromosome that overlap the segment
457            idx_chr = (chr_lai == chrom)
458            if not np.any(idx_chr):
459                continue
460            lai_st = physical_pos[idx_chr, 0]
461            lai_en = physical_pos[idx_chr, 1]
462            overlaps = (lai_en >= seg_start) & (lai_st <= seg_end)
463            if not np.any(overlaps):
464                continue
465
466            # Build per-window ancestry mask for both individuals
467            s1 = str(self.__sample_id_1[i])
468            s2 = str(self.__sample_id_2[i])
469            h1 = int(self.__haplotype_id_1[i]) if self.__haplotype_id_1 is not None else -1
470            h2 = int(self.__haplotype_id_2[i]) if self.__haplotype_id_2 is not None else -1
471
472            # Resolve haplotype column indices for each sample
473            # Known haplotypes are 1-based in inputs; convert to {0,1}
474            def _get_cols(sample: str) -> Tuple[int, int]:
475                a = hap_to_col.get(f"{sample}.0")
476                b = hap_to_col.get(f"{sample}.1")
477                if a is None or b is None:
478                    raise ValueError(f"Sample '{sample}' not found in LAI haplotypes.")
479                return a, b
480
481            s1_a, s1_b = _get_cols(s1)
482            s2_a, s2_b = _get_cols(s2)
483
484            # LAI rows for this chromosome
485            lai_rows = lai_str[idx_chr, :]
486
487            # Determine ancestry presence per window for each individual
488            if h1 in (1, 2) and h2 in (1, 2):
489                # Use specific haplotypes
490                s1_col = s1_a if (h1 - 1) == 0 else s1_b
491                s2_col = s2_a if (h2 - 1) == 0 else s2_b
492                s1_mask = (lai_rows[:, s1_col] == anc_str)
493                s2_mask = (lai_rows[:, s2_col] == anc_str)
494                if require_both_haplotypes:
495                    # Additionally require the other hap of each sample to match
496                    s1_other = s1_b if s1_col == s1_a else s1_a
497                    s2_other = s2_b if s2_col == s2_a else s2_a
498                    s1_mask = s1_mask & (lai_rows[:, s1_other] == anc_str)
499                    s2_mask = s2_mask & (lai_rows[:, s2_other] == anc_str)
500            else:
501                # Unknown hap IDs: require at least one hap to match (or both if requested)
502                if require_both_haplotypes:
503                    s1_mask = (lai_rows[:, s1_a] == anc_str) & (lai_rows[:, s1_b] == anc_str)
504                    s2_mask = (lai_rows[:, s2_a] == anc_str) & (lai_rows[:, s2_b] == anc_str)
505                else:
506                    s1_mask = (lai_rows[:, s1_a] == anc_str) | (lai_rows[:, s1_b] == anc_str)
507                    s2_mask = (lai_rows[:, s2_a] == anc_str) | (lai_rows[:, s2_b] == anc_str)
508
509            keep = overlaps & s1_mask & s2_mask
510
511            if method == 'strict':
512                # In strict mode, ALL overlapping windows must have target ancestry
513                if not np.array_equal(overlaps, keep):
514                    continue  # Drop entire segment
515
516                # Apply length filters to original segment
517                if min_bp is not None and (seg_end - seg_start + 1) < int(min_bp):
518                    continue
519
520                # In strict mode, preserve original length_cm
521                cm_len = float(self.__length_cm[i]) if self.__length_cm is not None else None
522
523                if min_cm is not None:
524                    if cm_len is None or cm_len < float(min_cm):
525                        continue
526
527                # Keep entire original segment
528                out_sample_id_1.append(s1)
529                out_sample_id_2.append(s2)
530                out_haplotype_id_1.append(h1)
531                out_haplotype_id_2.append(h2)
532                out_chrom.append(chrom)
533                out_start.append(seg_start)
534                out_end.append(seg_end)
535                out_length_cm.append(float(cm_len) if cm_len is not None else float('nan'))
536                if out_segment_type is not None:
537                    out_segment_type.append(str(self.__segment_type[i]))  # type: ignore
538
539            else:  # method == 'clip'
540                if not np.any(keep):
541                    continue
542
543                # Identify contiguous windows where keep=True
544                idx_keep = np.where(keep)[0]
545                # Split into runs of consecutive indices
546                breaks = np.where(np.diff(idx_keep) > 1)[0]
547                run_starts = np.r_[0, breaks + 1]
548                run_ends = np.r_[breaks, idx_keep.size - 1]
549
550                # Create subsegments for each contiguous run
551                for rs, re in zip(run_starts, run_ends):
552                    i0 = idx_keep[rs]
553                    i1 = idx_keep[re]
554                    sub_start = int(max(seg_start, int(lai_st[i0])))
555                    sub_end = int(min(seg_end, int(lai_en[i1])))
556                    if sub_end < sub_start:
557                        continue
558
559                    # Length filters: bp first
560                    if min_bp is not None and (sub_end - sub_start + 1) < int(min_bp):
561                        continue
562
563                    # Compute cM length if possible, else approximate or None
564                    cm_len = _approx_cm_len(idx_chr, sub_start, sub_end)
565                    if cm_len is None and self.__length_cm is not None:
566                        # Scale the original segment length by bp fraction
567                        total_bp = max(1, int(seg_end - seg_start + 1))
568                        frac_bp = float(sub_end - sub_start + 1) / float(total_bp)
569                        try:
570                            cm_len = float(self.__length_cm[i]) * frac_bp
571                        except Exception:
572                            cm_len = None
573
574                    # Apply cM filter if requested (treat None as 0)
575                    if min_cm is not None:
576                        if cm_len is None or cm_len < float(min_cm):
577                            continue
578
579                    # Append trimmed segment
580                    out_sample_id_1.append(s1)
581                    out_sample_id_2.append(s2)
582                    out_haplotype_id_1.append(h1)
583                    out_haplotype_id_2.append(h2)
584                    out_chrom.append(chrom)
585                    out_start.append(sub_start)
586                    out_end.append(sub_end)
587                    out_length_cm.append(float(cm_len) if cm_len is not None else float('nan'))
588                    if out_segment_type is not None:
589                        out_segment_type.append(str(self.__segment_type[i]))  # type: ignore
590
591        # If nothing remains, return empty object with zero segments
592        if len(out_start) == 0:
593            # Build minimal arrays
594            empty = IBDObject(
595                sample_id_1=np.array([], dtype=object),
596                haplotype_id_1=np.array([], dtype=int),
597                sample_id_2=np.array([], dtype=object),
598                haplotype_id_2=np.array([], dtype=int),
599                chrom=np.array([], dtype=object),
600                start=np.array([], dtype=int),
601                end=np.array([], dtype=int),
602                length_cm=None,
603                segment_type=None if out_segment_type is None else np.array([], dtype=object),
604            )
605            if inplace:
606                self.__sample_id_1 = empty.sample_id_1
607                self.__haplotype_id_1 = empty.haplotype_id_1
608                self.__sample_id_2 = empty.sample_id_2
609                self.__haplotype_id_2 = empty.haplotype_id_2
610                self.__chrom = empty.chrom
611                self.__start = empty.start
612                self.__end = empty.end
613                self.__length_cm = empty.length_cm
614                self.__segment_type = empty.segment_type
615                return None
616            return empty
617
618        # Assemble outputs
619        out_length_array: Optional[np.ndarray]
620        if len(out_length_cm) > 0:
621            # Convert NaNs to None-equivalent by using np.array with dtype float
622            out_length_array = np.asarray(out_length_cm, dtype=float)
623        else:
624            out_length_array = None
625
626        new_obj = IBDObject(
627            sample_id_1=np.asarray(out_sample_id_1, dtype=object),
628            haplotype_id_1=np.asarray(out_haplotype_id_1, dtype=int),
629            sample_id_2=np.asarray(out_sample_id_2, dtype=object),
630            haplotype_id_2=np.asarray(out_haplotype_id_2, dtype=int),
631            chrom=np.asarray(out_chrom, dtype=object),
632            start=np.asarray(out_start, dtype=int),
633            end=np.asarray(out_end, dtype=int),
634            length_cm=out_length_array,
635            segment_type=None if out_segment_type is None else np.asarray(out_segment_type, dtype=object),
636        )
637
638        if inplace:
639            self.__sample_id_1 = new_obj.sample_id_1
640            self.__haplotype_id_1 = new_obj.haplotype_id_1
641            self.__sample_id_2 = new_obj.sample_id_2
642            self.__haplotype_id_2 = new_obj.haplotype_id_2
643            self.__chrom = new_obj.chrom
644            self.__start = new_obj.start
645            self.__end = new_obj.end
646            self.__length_cm = new_obj.length_cm
647            self.__segment_type = new_obj.segment_type
648            return None
649        return new_obj
650
651    def _sanity_check(self) -> None:
652        """
653        Perform sanity checks on the parsed data to ensure data integrity.
654        """
655        n = self.__chrom.shape[0]
656        arrays = [
657            self.__sample_id_1,
658            self.__haplotype_id_1,
659            self.__sample_id_2,
660            self.__haplotype_id_2,
661            self.__start,
662            self.__end,
663        ]
664        if any(arr.shape[0] != n for arr in arrays):
665            raise ValueError("All input arrays must have the same length.")
666
667        if self.__length_cm is not None and self.__length_cm.shape[0] != n:
668            raise ValueError("`length_cm` must have the same length as other arrays.")
669
670        if self.__segment_type is not None and self.__segment_type.shape[0] != n:
671            raise ValueError("`segment_type` must have the same length as other arrays.")
672
673        # Validate haplotype identifiers are 1 or 2, or -1 when unknown
674        valid_values = np.array([1, 2, -1])
675        if not np.isin(self.__haplotype_id_1, valid_values).all() or not np.isin(self.__haplotype_id_2, valid_values).all():
676            raise ValueError("Haplotype identifiers must be in {1, 2} or -1 if unknown.")

A class for Identity-By-Descent (IBD) segment data.

IBDObject( sample_id_1: numpy.ndarray, haplotype_id_1: numpy.ndarray, sample_id_2: numpy.ndarray, haplotype_id_2: numpy.ndarray, chrom: numpy.ndarray, start: numpy.ndarray, end: numpy.ndarray, length_cm: numpy.ndarray | None = None, segment_type: numpy.ndarray | None = None)
17    def __init__(
18        self,
19        sample_id_1: np.ndarray,
20        haplotype_id_1: np.ndarray,
21        sample_id_2: np.ndarray,
22        haplotype_id_2: np.ndarray,
23        chrom: np.ndarray,
24        start: np.ndarray,
25        end: np.ndarray,
26        length_cm: Optional[np.ndarray] = None,
27        segment_type: Optional[np.ndarray] = None,
28    ) -> None:
29        """
30        Args:
31            sample_id_1 (array of shape (n_segments,)): Sample identifiers for the first individual.
32            haplotype_id_1 (array of shape (n_segments,)): Haplotype identifiers for the first individual (values in {1, 2}, or -1 if unknown).
33            sample_id_2 (array of shape (n_segments,)): Sample identifiers for the second individual.
34            haplotype_id_2 (array of shape (n_segments,)): Haplotype identifiers for the second individual (values in {1, 2}, or -1 if unknown).
35            chrom (array of shape (n_segments,)): Chromosome identifier for each IBD segment.
36            start (array of shape (n_segments,)): Start physical position (1-based, bp) for each IBD segment.
37            end (array of shape (n_segments,)): End physical position (1-based, bp) for each IBD segment.
38            length_cm (array of shape (n_segments,), optional): Genetic length (cM) for each segment, if available.
39        """
40        # Store attributes
41        self.__sample_id_1 = np.asarray(sample_id_1)
42        self.__haplotype_id_1 = np.asarray(haplotype_id_1)
43        self.__sample_id_2 = np.asarray(sample_id_2)
44        self.__haplotype_id_2 = np.asarray(haplotype_id_2)
45        self.__chrom = np.asarray(chrom)
46        self.__start = np.asarray(start)
47        self.__end = np.asarray(end)
48        self.__length_cm = None if length_cm is None else np.asarray(length_cm)
49        self.__segment_type = None if segment_type is None else np.asarray(segment_type)
50
51        self._sanity_check()
Arguments:
  • sample_id_1 (array of shape (n_segments,)): Sample identifiers for the first individual.
  • haplotype_id_1 (array of shape (n_segments,)): Haplotype identifiers for the first individual (values in {1, 2}, or -1 if unknown).
  • sample_id_2 (array of shape (n_segments,)): Sample identifiers for the second individual.
  • haplotype_id_2 (array of shape (n_segments,)): Haplotype identifiers for the second individual (values in {1, 2}, or -1 if unknown).
  • chrom (array of shape (n_segments,)): Chromosome identifier for each IBD segment.
  • start (array of shape (n_segments,)): Start physical position (1-based, bp) for each IBD segment.
  • end (array of shape (n_segments,)): End physical position (1-based, bp) for each IBD segment.
  • length_cm (array of shape (n_segments,), optional): Genetic length (cM) for each segment, if available.
sample_id_1: numpy.ndarray
73    @property
74    def sample_id_1(self) -> np.ndarray:
75        """
76        Retrieve `sample_id_1`.
77
78        Returns:
79            **array of shape (n_segments,):** Sample identifiers for the first individual.
80        """
81        return self.__sample_id_1

Retrieve sample_id_1.

Returns:

array of shape (n_segments,): Sample identifiers for the first individual.

haplotype_id_1: numpy.ndarray
90    @property
91    def haplotype_id_1(self) -> np.ndarray:
92        """
93        Retrieve `haplotype_id_1`.
94
95        Returns:
96            **array of shape (n_segments,):** Haplotype identifiers for the first individual (values in {1, 2}).
97        """
98        return self.__haplotype_id_1

Retrieve haplotype_id_1.

Returns:

array of shape (n_segments,): Haplotype identifiers for the first individual (values in {1, 2}).

sample_id_2: numpy.ndarray
107    @property
108    def sample_id_2(self) -> np.ndarray:
109        """
110        Retrieve `sample_id_2`.
111
112        Returns:
113            **array of shape (n_segments,):** Sample identifiers for the second individual.
114        """
115        return self.__sample_id_2

Retrieve sample_id_2.

Returns:

array of shape (n_segments,): Sample identifiers for the second individual.

haplotype_id_2: numpy.ndarray
124    @property
125    def haplotype_id_2(self) -> np.ndarray:
126        """
127        Retrieve `haplotype_id_2`.
128
129        Returns:
130            **array of shape (n_segments,):** Haplotype identifiers for the second individual (values in {1, 2}).
131        """
132        return self.__haplotype_id_2

Retrieve haplotype_id_2.

Returns:

array of shape (n_segments,): Haplotype identifiers for the second individual (values in {1, 2}).

chrom: numpy.ndarray
141    @property
142    def chrom(self) -> np.ndarray:
143        """
144        Retrieve `chrom`.
145
146        Returns:
147            **array of shape (n_segments,):** Chromosome identifier for each IBD segment.
148        """
149        return self.__chrom

Retrieve chrom.

Returns:

array of shape (n_segments,): Chromosome identifier for each IBD segment.

start: numpy.ndarray
158    @property
159    def start(self) -> np.ndarray:
160        """
161        Retrieve `start`.
162
163        Returns:
164            **array of shape (n_segments,):** Start physical position (1-based, bp) for each IBD segment.
165        """
166        return self.__start

Retrieve start.

Returns:

array of shape (n_segments,): Start physical position (1-based, bp) for each IBD segment.

end: numpy.ndarray
175    @property
176    def end(self) -> np.ndarray:
177        """
178        Retrieve `end`.
179
180        Returns:
181            **array of shape (n_segments,):** End physical position (1-based, bp) for each IBD segment.
182        """
183        return self.__end

Retrieve end.

Returns:

array of shape (n_segments,): End physical position (1-based, bp) for each IBD segment.

length_cm: numpy.ndarray | None
192    @property
193    def length_cm(self) -> Optional[np.ndarray]:
194        """
195        Retrieve `length_cm`.
196
197        Returns:
198            **array of shape (n_segments,):** Genetic length (cM) for each segment if available; otherwise None.
199        """
200        return self.__length_cm

Retrieve length_cm.

Returns:

array of shape (n_segments,): Genetic length (cM) for each segment if available; otherwise None.

segment_type: numpy.ndarray | None
209    @property
210    def segment_type(self) -> Optional[np.ndarray]:
211        """
212        Retrieve `segment_type`.
213
214        Returns:
215            **array of shape (n_segments,):** Segment type labels (e.g., 'IBD1', 'IBD2'), or None if unavailable.
216        """
217        return self.__segment_type

Retrieve segment_type.

Returns:

array of shape (n_segments,): Segment type labels (e.g., 'IBD1', 'IBD2'), or None if unavailable.

n_segments: int
226    @property
227    def n_segments(self) -> int:
228        """
229        Retrieve `n_segments`.
230
231        Returns:
232            **int:** The total number of IBD segments.
233        """
234        return self.__chrom.shape[0]

Retrieve n_segments.

Returns:

int: The total number of IBD segments.

pairs: numpy.ndarray
236    @property
237    def pairs(self) -> np.ndarray:
238        """
239        Retrieve `pairs`.
240
241        Returns:
242            **array of shape (n_segments, 2):** Per-segment sample identifier pairs.
243        """
244        return np.column_stack([self.__sample_id_1, self.__sample_id_2])

Retrieve pairs.

Returns:

array of shape (n_segments, 2): Per-segment sample identifier pairs.

haplotype_pairs: numpy.ndarray
246    @property
247    def haplotype_pairs(self) -> np.ndarray:
248        """
249        Retrieve `haplotype_pairs`.
250
251        Returns:
252            **array of shape (n_segments, 2):** Per-segment haplotype identifier pairs.
253        """
254        return np.column_stack([self.__haplotype_id_1, self.__haplotype_id_2])

Retrieve haplotype_pairs.

Returns:

array of shape (n_segments, 2): Per-segment haplotype identifier pairs.

def copy(self) -> 'IBDObject':
256    def copy(self) -> 'IBDObject':
257        """
258        Create and return a copy of `self`.
259
260        Returns:
261            **IBDObject:** A new instance of the current object.
262        """
263        return copy.deepcopy(self)

Create and return a copy of self.

Returns:

IBDObject: A new instance of the current object.

def keys(self) -> List[str]:
265    def keys(self) -> List[str]:
266        """
267        Retrieve a list of public attribute names for `self`.
268
269        Returns:
270            **list of str:** A list of attribute names, with internal name-mangling removed.
271        """
272        return [attr.replace('_IBDObject__', '') for attr in vars(self)]

Retrieve a list of public attribute names for self.

Returns:

list of str: A list of attribute names, with internal name-mangling removed.

def filter_segments( self, chrom: Sequence[str] | None = None, samples: Sequence[str] | None = None, min_length_cm: float | None = None, segment_types: Sequence[str] | None = None, inplace: bool = False) -> ForwardRef('IBDObject') | None:
274    def filter_segments(
275        self,
276        chrom: Optional[Sequence[str]] = None,
277        samples: Optional[Sequence[str]] = None,
278        min_length_cm: Optional[float] = None,
279        segment_types: Optional[Sequence[str]] = None,
280        inplace: bool = False,
281    ) -> Optional['IBDObject']:
282        """
283        Filter IBD segments by chromosome, sample names, and/or minimum genetic length.
284
285        Args:
286            chrom (sequence of str, optional): Chromosome(s) to include.
287            samples (sequence of str, optional): Sample names to include if present in either column.
288            min_length_cm (float, optional): Minimum cM length threshold.
289            inplace (bool, default=False): If True, modifies `self` in place. If False, returns a new `IBDObject`.
290
291        Returns:
292            **Optional[IBDObject]:** A filtered IBDObject if `inplace=False`. If `inplace=True`, returns None.
293        """
294        mask = np.ones(self.n_segments, dtype=bool)
295
296        if chrom is not None:
297            chrom = np.atleast_1d(chrom)
298            mask &= np.isin(self.__chrom, chrom)
299
300        if samples is not None:
301            samples = np.atleast_1d(samples)
302            mask &= np.isin(self.__sample_id_1, samples) | np.isin(self.__sample_id_2, samples)
303
304        if min_length_cm is not None and self.__length_cm is not None:
305            mask &= self.__length_cm >= float(min_length_cm)
306
307        if segment_types is not None and self.__segment_type is not None:
308            segment_types = np.atleast_1d(segment_types)
309            mask &= np.isin(self.__segment_type, segment_types)
310
311        def _apply_mask(x: Optional[np.ndarray]) -> Optional[np.ndarray]:
312            return None if x is None else np.asarray(x)[mask]
313
314        if inplace:
315            self.__sample_id_1 = _apply_mask(self.__sample_id_1)
316            self.__haplotype_id_1 = _apply_mask(self.__haplotype_id_1)
317            self.__sample_id_2 = _apply_mask(self.__sample_id_2)
318            self.__haplotype_id_2 = _apply_mask(self.__haplotype_id_2)
319            self.__chrom = _apply_mask(self.__chrom)
320            self.__start = _apply_mask(self.__start)
321            self.__end = _apply_mask(self.__end)
322            self.__length_cm = _apply_mask(self.__length_cm)
323            self.__segment_type = _apply_mask(self.__segment_type)
324            return None
325        else:
326            return IBDObject(
327                sample_id_1=_apply_mask(self.__sample_id_1),
328                haplotype_id_1=_apply_mask(self.__haplotype_id_1),
329                sample_id_2=_apply_mask(self.__sample_id_2),
330                haplotype_id_2=_apply_mask(self.__haplotype_id_2),
331                chrom=_apply_mask(self.__chrom),
332                start=_apply_mask(self.__start),
333                end=_apply_mask(self.__end),
334                length_cm=_apply_mask(self.__length_cm),
335                segment_type=_apply_mask(self.__segment_type),
336            )

Filter IBD segments by chromosome, sample names, and/or minimum genetic length.

Arguments:
  • chrom (sequence of str, optional): Chromosome(s) to include.
  • samples (sequence of str, optional): Sample names to include if present in either column.
  • min_length_cm (float, optional): Minimum cM length threshold.
  • inplace (bool, default=False): If True, modifies self in place. If False, returns a new IBDObject.
Returns:

Optional[IBDObject]: A filtered IBDObject if inplace=False. If inplace=True, returns None.

def restrict_to_ancestry( self, *, laiobj: Any, ancestry: Any, require_both_haplotypes: bool = False, min_bp: int | None = None, min_cm: float | None = None, inplace: bool = False, method: str = 'clip') -> ForwardRef('IBDObject') | None:
338    def restrict_to_ancestry(
339        self,
340        *,
341        laiobj: Any,
342        ancestry: Any,
343        require_both_haplotypes: bool = False,
344        min_bp: Optional[int] = None,
345        min_cm: Optional[float] = None,
346        inplace: bool = False,
347        method: str = 'clip',
348    ) -> Optional['IBDObject']:
349        """
350        Filter and/or trim IBD segments to intervals where both individuals carry the specified ancestry
351        according to a `LocalAncestryObject`.
352
353        This performs an interval intersection per segment against ancestry tracts:
354        - If haplotype IDs are known (e.g., Hap-IBD), ancestry is checked on the specific
355          haplotype of each individual.
356        - If haplotype IDs are unknown (e.g., ancIBD; haplotype_id_* == -1), ancestry is
357          considered present for an individual if at least one of their haplotypes matches
358          the requested ancestry (unless `require_both_haplotypes=True`).
359
360        Method 'strict':
361            Drop entire IBD segments if ANY overlapping LAI window contains non-target ancestry
362            for either individual. No trimming occurs - segments are kept whole or dropped completely.
363
364        Method 'clip':
365            Trim IBD segments to contiguous regions where both individuals have the target ancestry.
366            Resulting subsegments are clipped to LAI window boundaries and original IBD start/end,
367            with optional length filtering by bp or cM.
368
369        Args:
370            laiobj: LocalAncestryObject containing 2D `lai` of shape (n_windows, n_haplotypes),
371                `physical_pos` (n_windows, 2), and `chromosomes` (n_windows,).
372            ancestry: Target ancestry code or label. Compared as string, so both int and str work.
373            require_both_haplotypes: If True, require both haplotypes of each individual to have
374                the target ancestry within a window. When haplotypes are known per segment, this
375                only affects cases with unknown haplotypes (== -1) or IBD2 segments.
376            min_bp: Minimum base-pair length to retain a segment (strict) or subsegment (clip).
377            min_cm: Minimum centiMorgan length to retain a segment (strict) or subsegment (clip).
378            inplace: If True, replace `self` with the restricted object; else return a new object.
379            method: Method to use for filtering. 'strict' drops entire segments that overlap with
380                non-target ancestry. 'clip' trims segments to target ancestry regions.
381
382        Returns:
383            Optional[IBDObject]: A restricted IBDObject if `inplace=False`. If `inplace=True`,
384                returns None.
385        """
386        if method not in ['strict', 'clip']:
387            raise ValueError(f"Method must be 'strict' or 'clip', got '{method}'")
388
389        # Basic LAI shape/metadata checks
390        lai = getattr(laiobj, 'lai', None)
391        physical_pos = getattr(laiobj, 'physical_pos', None)
392        chromosomes = getattr(laiobj, 'chromosomes', None)
393        centimorgan_pos = getattr(laiobj, 'centimorgan_pos', None)
394        haplotypes = getattr(laiobj, 'haplotypes', None)
395
396        if lai is None or physical_pos is None or chromosomes is None or haplotypes is None:
397            raise ValueError(
398                "`laiobj` must provide `lai`, `physical_pos`, `chromosomes`, and `haplotypes`."
399            )
400
401        if lai.ndim != 2:
402            raise ValueError("`laiobj.lai` must be 2D with shape (n_windows, n_haplotypes).")
403
404        # Build haplotype label -> column index map (labels like 'Sample.0', 'Sample.1')
405        hap_to_col = {str(h): i for i, h in enumerate(haplotypes)}
406
407        # Coerce ancestry to str for robust comparisons
408        anc_str = str(ancestry)
409
410        # Coerce LAI values to str once for comparisons
411        lai_str = lai.astype(str)
412
413        # Prepare arrays for the restricted segments
414        out_sample_id_1: List[str] = []
415        out_haplotype_id_1: List[int] = []
416        out_sample_id_2: List[str] = []
417        out_haplotype_id_2: List[int] = []
418        out_chrom: List[str] = []
419        out_start: List[int] = []
420        out_end: List[int] = []
421        out_length_cm: List[float] = []
422        out_segment_type: List[str] = [] if self.__segment_type is not None else None  # type: ignore
423
424        # Vectorize chrom compare by making LAI chromosome strings
425        chr_lai = np.asarray(chromosomes).astype(str)
426
427        # Helper to compute cM length for a trimmed interval using LAI windows
428        def _approx_cm_len(chr_mask: np.ndarray, start_bp: int, end_bp: int) -> Optional[float]:
429            if centimorgan_pos is None:
430                return None
431            win_st = physical_pos[chr_mask, 0]
432            win_en = physical_pos[chr_mask, 1]
433            win_cm_st = centimorgan_pos[chr_mask, 0]
434            win_cm_en = centimorgan_pos[chr_mask, 1]
435            cm_total = 0.0
436            for ws, we, cs, ce in zip(win_st, win_en, win_cm_st, win_cm_en):
437                # Overlap with [start_bp, end_bp]
438                overlap_start = max(int(ws), int(start_bp))
439                overlap_end = min(int(we), int(end_bp))
440                if overlap_start > overlap_end:
441                    continue
442                wlen_bp = max(1, int(we) - int(ws) + 1)
443                olen_bp = int(overlap_end) - int(overlap_start) + 1
444                frac = float(olen_bp) / float(wlen_bp)
445                cm_total += frac * float(ce - cs)
446            return cm_total
447
448        # Iterate over segments
449        for i in range(self.n_segments):
450            chrom = str(self.__chrom[i])
451            seg_start = int(self.__start[i])
452            seg_end = int(self.__end[i])
453            if seg_end < seg_start:
454                continue
455
456            # Subset LAI windows on this chromosome that overlap the segment
457            idx_chr = (chr_lai == chrom)
458            if not np.any(idx_chr):
459                continue
460            lai_st = physical_pos[idx_chr, 0]
461            lai_en = physical_pos[idx_chr, 1]
462            overlaps = (lai_en >= seg_start) & (lai_st <= seg_end)
463            if not np.any(overlaps):
464                continue
465
466            # Build per-window ancestry mask for both individuals
467            s1 = str(self.__sample_id_1[i])
468            s2 = str(self.__sample_id_2[i])
469            h1 = int(self.__haplotype_id_1[i]) if self.__haplotype_id_1 is not None else -1
470            h2 = int(self.__haplotype_id_2[i]) if self.__haplotype_id_2 is not None else -1
471
472            # Resolve haplotype column indices for each sample
473            # Known haplotypes are 1-based in inputs; convert to {0,1}
474            def _get_cols(sample: str) -> Tuple[int, int]:
475                a = hap_to_col.get(f"{sample}.0")
476                b = hap_to_col.get(f"{sample}.1")
477                if a is None or b is None:
478                    raise ValueError(f"Sample '{sample}' not found in LAI haplotypes.")
479                return a, b
480
481            s1_a, s1_b = _get_cols(s1)
482            s2_a, s2_b = _get_cols(s2)
483
484            # LAI rows for this chromosome
485            lai_rows = lai_str[idx_chr, :]
486
487            # Determine ancestry presence per window for each individual
488            if h1 in (1, 2) and h2 in (1, 2):
489                # Use specific haplotypes
490                s1_col = s1_a if (h1 - 1) == 0 else s1_b
491                s2_col = s2_a if (h2 - 1) == 0 else s2_b
492                s1_mask = (lai_rows[:, s1_col] == anc_str)
493                s2_mask = (lai_rows[:, s2_col] == anc_str)
494                if require_both_haplotypes:
495                    # Additionally require the other hap of each sample to match
496                    s1_other = s1_b if s1_col == s1_a else s1_a
497                    s2_other = s2_b if s2_col == s2_a else s2_a
498                    s1_mask = s1_mask & (lai_rows[:, s1_other] == anc_str)
499                    s2_mask = s2_mask & (lai_rows[:, s2_other] == anc_str)
500            else:
501                # Unknown hap IDs: require at least one hap to match (or both if requested)
502                if require_both_haplotypes:
503                    s1_mask = (lai_rows[:, s1_a] == anc_str) & (lai_rows[:, s1_b] == anc_str)
504                    s2_mask = (lai_rows[:, s2_a] == anc_str) & (lai_rows[:, s2_b] == anc_str)
505                else:
506                    s1_mask = (lai_rows[:, s1_a] == anc_str) | (lai_rows[:, s1_b] == anc_str)
507                    s2_mask = (lai_rows[:, s2_a] == anc_str) | (lai_rows[:, s2_b] == anc_str)
508
509            keep = overlaps & s1_mask & s2_mask
510
511            if method == 'strict':
512                # In strict mode, ALL overlapping windows must have target ancestry
513                if not np.array_equal(overlaps, keep):
514                    continue  # Drop entire segment
515
516                # Apply length filters to original segment
517                if min_bp is not None and (seg_end - seg_start + 1) < int(min_bp):
518                    continue
519
520                # In strict mode, preserve original length_cm
521                cm_len = float(self.__length_cm[i]) if self.__length_cm is not None else None
522
523                if min_cm is not None:
524                    if cm_len is None or cm_len < float(min_cm):
525                        continue
526
527                # Keep entire original segment
528                out_sample_id_1.append(s1)
529                out_sample_id_2.append(s2)
530                out_haplotype_id_1.append(h1)
531                out_haplotype_id_2.append(h2)
532                out_chrom.append(chrom)
533                out_start.append(seg_start)
534                out_end.append(seg_end)
535                out_length_cm.append(float(cm_len) if cm_len is not None else float('nan'))
536                if out_segment_type is not None:
537                    out_segment_type.append(str(self.__segment_type[i]))  # type: ignore
538
539            else:  # method == 'clip'
540                if not np.any(keep):
541                    continue
542
543                # Identify contiguous windows where keep=True
544                idx_keep = np.where(keep)[0]
545                # Split into runs of consecutive indices
546                breaks = np.where(np.diff(idx_keep) > 1)[0]
547                run_starts = np.r_[0, breaks + 1]
548                run_ends = np.r_[breaks, idx_keep.size - 1]
549
550                # Create subsegments for each contiguous run
551                for rs, re in zip(run_starts, run_ends):
552                    i0 = idx_keep[rs]
553                    i1 = idx_keep[re]
554                    sub_start = int(max(seg_start, int(lai_st[i0])))
555                    sub_end = int(min(seg_end, int(lai_en[i1])))
556                    if sub_end < sub_start:
557                        continue
558
559                    # Length filters: bp first
560                    if min_bp is not None and (sub_end - sub_start + 1) < int(min_bp):
561                        continue
562
563                    # Compute cM length if possible, else approximate or None
564                    cm_len = _approx_cm_len(idx_chr, sub_start, sub_end)
565                    if cm_len is None and self.__length_cm is not None:
566                        # Scale the original segment length by bp fraction
567                        total_bp = max(1, int(seg_end - seg_start + 1))
568                        frac_bp = float(sub_end - sub_start + 1) / float(total_bp)
569                        try:
570                            cm_len = float(self.__length_cm[i]) * frac_bp
571                        except Exception:
572                            cm_len = None
573
574                    # Apply cM filter if requested (treat None as 0)
575                    if min_cm is not None:
576                        if cm_len is None or cm_len < float(min_cm):
577                            continue
578
579                    # Append trimmed segment
580                    out_sample_id_1.append(s1)
581                    out_sample_id_2.append(s2)
582                    out_haplotype_id_1.append(h1)
583                    out_haplotype_id_2.append(h2)
584                    out_chrom.append(chrom)
585                    out_start.append(sub_start)
586                    out_end.append(sub_end)
587                    out_length_cm.append(float(cm_len) if cm_len is not None else float('nan'))
588                    if out_segment_type is not None:
589                        out_segment_type.append(str(self.__segment_type[i]))  # type: ignore
590
591        # If nothing remains, return empty object with zero segments
592        if len(out_start) == 0:
593            # Build minimal arrays
594            empty = IBDObject(
595                sample_id_1=np.array([], dtype=object),
596                haplotype_id_1=np.array([], dtype=int),
597                sample_id_2=np.array([], dtype=object),
598                haplotype_id_2=np.array([], dtype=int),
599                chrom=np.array([], dtype=object),
600                start=np.array([], dtype=int),
601                end=np.array([], dtype=int),
602                length_cm=None,
603                segment_type=None if out_segment_type is None else np.array([], dtype=object),
604            )
605            if inplace:
606                self.__sample_id_1 = empty.sample_id_1
607                self.__haplotype_id_1 = empty.haplotype_id_1
608                self.__sample_id_2 = empty.sample_id_2
609                self.__haplotype_id_2 = empty.haplotype_id_2
610                self.__chrom = empty.chrom
611                self.__start = empty.start
612                self.__end = empty.end
613                self.__length_cm = empty.length_cm
614                self.__segment_type = empty.segment_type
615                return None
616            return empty
617
618        # Assemble outputs
619        out_length_array: Optional[np.ndarray]
620        if len(out_length_cm) > 0:
621            # Convert NaNs to None-equivalent by using np.array with dtype float
622            out_length_array = np.asarray(out_length_cm, dtype=float)
623        else:
624            out_length_array = None
625
626        new_obj = IBDObject(
627            sample_id_1=np.asarray(out_sample_id_1, dtype=object),
628            haplotype_id_1=np.asarray(out_haplotype_id_1, dtype=int),
629            sample_id_2=np.asarray(out_sample_id_2, dtype=object),
630            haplotype_id_2=np.asarray(out_haplotype_id_2, dtype=int),
631            chrom=np.asarray(out_chrom, dtype=object),
632            start=np.asarray(out_start, dtype=int),
633            end=np.asarray(out_end, dtype=int),
634            length_cm=out_length_array,
635            segment_type=None if out_segment_type is None else np.asarray(out_segment_type, dtype=object),
636        )
637
638        if inplace:
639            self.__sample_id_1 = new_obj.sample_id_1
640            self.__haplotype_id_1 = new_obj.haplotype_id_1
641            self.__sample_id_2 = new_obj.sample_id_2
642            self.__haplotype_id_2 = new_obj.haplotype_id_2
643            self.__chrom = new_obj.chrom
644            self.__start = new_obj.start
645            self.__end = new_obj.end
646            self.__length_cm = new_obj.length_cm
647            self.__segment_type = new_obj.segment_type
648            return None
649        return new_obj

Filter and/or trim IBD segments to intervals where both individuals carry the specified ancestry according to a LocalAncestryObject.

This performs an interval intersection per segment against ancestry tracts:

  • If haplotype IDs are known (e.g., Hap-IBD), ancestry is checked on the specific haplotype of each individual.
  • If haplotype IDs are unknown (e.g., ancIBD; haplotype_id_* == -1), ancestry is considered present for an individual if at least one of their haplotypes matches the requested ancestry (unless require_both_haplotypes=True).

Method 'strict': Drop entire IBD segments if ANY overlapping LAI window contains non-target ancestry for either individual. No trimming occurs - segments are kept whole or dropped completely.

Method 'clip': Trim IBD segments to contiguous regions where both individuals have the target ancestry. Resulting subsegments are clipped to LAI window boundaries and original IBD start/end, with optional length filtering by bp or cM.

Arguments:
  • laiobj: LocalAncestryObject containing 2D lai of shape (n_windows, n_haplotypes), physical_pos (n_windows, 2), and chromosomes (n_windows,).
  • ancestry: Target ancestry code or label. Compared as string, so both int and str work.
  • require_both_haplotypes: If True, require both haplotypes of each individual to have the target ancestry within a window. When haplotypes are known per segment, this only affects cases with unknown haplotypes (== -1) or IBD2 segments.
  • min_bp: Minimum base-pair length to retain a segment (strict) or subsegment (clip).
  • min_cm: Minimum centiMorgan length to retain a segment (strict) or subsegment (clip).
  • inplace: If True, replace self with the restricted object; else return a new object.
  • method: Method to use for filtering. 'strict' drops entire segments that overlap with non-target ancestry. 'clip' trims segments to target ancestry regions.
Returns:

Optional[IBDObject]: A restricted IBDObject if inplace=False. If inplace=True, returns None.

def read_ibd( file: str | pathlib.Path, **kwargs) -> IBDObject:
 8def read_ibd(file: Union[str, Path], **kwargs) -> IBDObject:
 9    """
10    Automatically detect the IBD data file format from the file's extension and read it into an `IBDObject`.
11
12    Supported formats:
13    - Hap-IBD (no standard extension; defaults to tab-delimited columns without header).
14    - ancIBD (template only).
15
16    Args:
17        file (str or pathlib.Path): Path to the file to be read.
18        **kwargs: Additional arguments passed to the reader method.
19    """
20    from snputils.ibd.io.read.auto import IBDReader
21
22    return IBDReader(file).read(**kwargs)

Automatically detect the IBD data file format from the file's extension and read it into an IBDObject.

Supported formats:

  • Hap-IBD (no standard extension; defaults to tab-delimited columns without header).
  • ancIBD (template only).
Arguments:
  • file (str or pathlib.Path): Path to the file to be read.
  • **kwargs: Additional arguments passed to the reader method.
class HapIBDReader(snputils.ibd.io.read.base.IBDBaseReader):
 18class HapIBDReader(IBDBaseReader):
 19    """
 20    Reads an IBD file in Hap-IBD format and processes it into an `IBDObject`.
 21    """
 22
 23    def read(self, separator: Optional[str] = None) -> IBDObject:
 24        """
 25        Read a Hap-IBD file into an `IBDObject`.
 26
 27        The Hap-IBD format is a delimited text without a header with columns:
 28        sample_id_1, haplotype_id_1, sample_id_2, haplotype_id_2, chromosome, start, end, length_cm
 29
 30        Notes:
 31        - Haplotype identifiers are 1-based and take values in {1, 2}.
 32
 33        Args:
 34            separator (str, optional): Field delimiter. If None, whitespace (any number of spaces or tabs) is assumed.
 35
 36        Returns:
 37            **IBDObject**: An IBDObject instance.
 38        """
 39        log.info(f"Reading {self.file}")
 40
 41        # Column names for Hap-IBD files (no header present in input)
 42        col_names = [
 43            'sample_id_1', 'haplotype_id_1', 'sample_id_2', 'haplotype_id_2',
 44            'chrom', 'start', 'end', 'length_cm'
 45        ]
 46
 47        # Detect gzip by extension
 48        is_gz = str(self.file).endswith('.gz')
 49
 50        # If separator is None, treat as whitespace-delimited (any spaces or tabs)
 51        if separator is None:
 52            # Polars doesn't support regex separators; normalize whitespace to single tabs before parsing
 53            if is_gz:
 54                with gzip.open(self.file, 'rt') as f:
 55                    lines = [re.sub(r"\s+", "\t", line.strip()) for line in f if line.strip()]
 56            else:
 57                with open(self.file, 'r') as f:
 58                    lines = [re.sub(r"\s+", "\t", line.strip()) for line in f if line.strip()]
 59
 60            data = StringIO("\n".join(lines))
 61            df = pl.read_csv(
 62                source=data,
 63                has_header=False,
 64                separator='\t',
 65                new_columns=col_names,
 66                schema_overrides={
 67                    'sample_id_1': pl.Utf8,
 68                    'haplotype_id_1': pl.Int8,
 69                    'sample_id_2': pl.Utf8,
 70                    'haplotype_id_2': pl.Int8,
 71                    'chrom': pl.Utf8,
 72                    'start': pl.Int64,
 73                    'end': pl.Int64,
 74                    'length_cm': pl.Float64,
 75                },
 76            )
 77        else:
 78            df = pl.read_csv(
 79                source=str(self.file),
 80                has_header=False,
 81                separator=separator,
 82                new_columns=col_names,
 83                schema_overrides={
 84                    'sample_id_1': pl.Utf8,
 85                    'haplotype_id_1': pl.Int8,
 86                    'sample_id_2': pl.Utf8,
 87                    'haplotype_id_2': pl.Int8,
 88                    'chrom': pl.Utf8,
 89                    'start': pl.Int64,
 90                    'end': pl.Int64,
 91                    'length_cm': pl.Float64,
 92                },
 93            )
 94
 95        ibdobj = IBDObject(
 96            sample_id_1=df['sample_id_1'].to_numpy(),
 97            haplotype_id_1=df['haplotype_id_1'].to_numpy(),
 98            sample_id_2=df['sample_id_2'].to_numpy(),
 99            haplotype_id_2=df['haplotype_id_2'].to_numpy(),
100            chrom=df['chrom'].to_numpy(),
101            start=df['start'].to_numpy(),
102            end=df['end'].to_numpy(),
103            length_cm=df['length_cm'].to_numpy(),
104            segment_type=np.array(["IBD1"] * df.height),  # hap-IBD does not distinguish; treat as IBD1
105        )
106
107        log.info(f"Finished reading {self.file}")
108
109        return ibdobj

Reads an IBD file in Hap-IBD format and processes it into an IBDObject.

def read( self, separator: str | None = None) -> IBDObject:
 23    def read(self, separator: Optional[str] = None) -> IBDObject:
 24        """
 25        Read a Hap-IBD file into an `IBDObject`.
 26
 27        The Hap-IBD format is a delimited text without a header with columns:
 28        sample_id_1, haplotype_id_1, sample_id_2, haplotype_id_2, chromosome, start, end, length_cm
 29
 30        Notes:
 31        - Haplotype identifiers are 1-based and take values in {1, 2}.
 32
 33        Args:
 34            separator (str, optional): Field delimiter. If None, whitespace (any number of spaces or tabs) is assumed.
 35
 36        Returns:
 37            **IBDObject**: An IBDObject instance.
 38        """
 39        log.info(f"Reading {self.file}")
 40
 41        # Column names for Hap-IBD files (no header present in input)
 42        col_names = [
 43            'sample_id_1', 'haplotype_id_1', 'sample_id_2', 'haplotype_id_2',
 44            'chrom', 'start', 'end', 'length_cm'
 45        ]
 46
 47        # Detect gzip by extension
 48        is_gz = str(self.file).endswith('.gz')
 49
 50        # If separator is None, treat as whitespace-delimited (any spaces or tabs)
 51        if separator is None:
 52            # Polars doesn't support regex separators; normalize whitespace to single tabs before parsing
 53            if is_gz:
 54                with gzip.open(self.file, 'rt') as f:
 55                    lines = [re.sub(r"\s+", "\t", line.strip()) for line in f if line.strip()]
 56            else:
 57                with open(self.file, 'r') as f:
 58                    lines = [re.sub(r"\s+", "\t", line.strip()) for line in f if line.strip()]
 59
 60            data = StringIO("\n".join(lines))
 61            df = pl.read_csv(
 62                source=data,
 63                has_header=False,
 64                separator='\t',
 65                new_columns=col_names,
 66                schema_overrides={
 67                    'sample_id_1': pl.Utf8,
 68                    'haplotype_id_1': pl.Int8,
 69                    'sample_id_2': pl.Utf8,
 70                    'haplotype_id_2': pl.Int8,
 71                    'chrom': pl.Utf8,
 72                    'start': pl.Int64,
 73                    'end': pl.Int64,
 74                    'length_cm': pl.Float64,
 75                },
 76            )
 77        else:
 78            df = pl.read_csv(
 79                source=str(self.file),
 80                has_header=False,
 81                separator=separator,
 82                new_columns=col_names,
 83                schema_overrides={
 84                    'sample_id_1': pl.Utf8,
 85                    'haplotype_id_1': pl.Int8,
 86                    'sample_id_2': pl.Utf8,
 87                    'haplotype_id_2': pl.Int8,
 88                    'chrom': pl.Utf8,
 89                    'start': pl.Int64,
 90                    'end': pl.Int64,
 91                    'length_cm': pl.Float64,
 92                },
 93            )
 94
 95        ibdobj = IBDObject(
 96            sample_id_1=df['sample_id_1'].to_numpy(),
 97            haplotype_id_1=df['haplotype_id_1'].to_numpy(),
 98            sample_id_2=df['sample_id_2'].to_numpy(),
 99            haplotype_id_2=df['haplotype_id_2'].to_numpy(),
100            chrom=df['chrom'].to_numpy(),
101            start=df['start'].to_numpy(),
102            end=df['end'].to_numpy(),
103            length_cm=df['length_cm'].to_numpy(),
104            segment_type=np.array(["IBD1"] * df.height),  # hap-IBD does not distinguish; treat as IBD1
105        )
106
107        log.info(f"Finished reading {self.file}")
108
109        return ibdobj

Read a Hap-IBD file into an IBDObject.

The Hap-IBD format is a delimited text without a header with columns: sample_id_1, haplotype_id_1, sample_id_2, haplotype_id_2, chromosome, start, end, length_cm

Notes:

  • Haplotype identifiers are 1-based and take values in {1, 2}.
Arguments:
  • separator (str, optional): Field delimiter. If None, whitespace (any number of spaces or tabs) is assumed.
Returns:

IBDObject: An IBDObject instance.

class AncIBDReader(snputils.ibd.io.read.base.IBDBaseReader):
 17class AncIBDReader(IBDBaseReader):
 18    """
 19    Reads IBD data from ancIBD outputs (TSV), accepting a file (`ch_all.tsv` or `ch*.tsv`) or a directory.
 20    """
 21
 22    def read(
 23        self,
 24        path: Optional[Union[str, Path]] = None,
 25        include_segment_types: Optional[Sequence[str]] = ("IBD1", "IBD2"),
 26    ) -> IBDObject:
 27        """
 28        Read ancIBD outputs and convert to `IBDObject`.
 29
 30        Inputs accepted:
 31        - A single TSV (optionally gzipped), e.g. `ch_all.tsv[.gz]` or `ch{CHR}.tsv[.gz]`.
 32        - A directory containing per-chromosome TSVs or `ch_all.tsv`.
 33
 34        Column schema (tab-separated with header):
 35        iid1, iid2, ch, Start, End, length, StartM, EndM, lengthM, StartBP, EndBP, segment_type
 36
 37        Notes:
 38        - Haplotype indices are not provided by ancIBD; set to -1.
 39        - Positions in IBDObject use base-pair StartBP/EndBP.
 40        - Length uses centiMorgan as `lengthM * 100`.
 41
 42        Args:
 43            path (str or Path, optional): Override input path. Defaults to `self.file`.
 44            include_segment_types (sequence of str, optional): Filter by `segment_type` (e.g., IBD1, IBD2). None to disable.
 45
 46        Returns:
 47            **IBDObject**: An IBDObject instance.
 48        """
 49        p = Path(path) if path is not None else Path(self.file)
 50        log.info(f"Reading ancIBD from {p}")
 51
 52        files: list[Path]
 53        if p.is_dir():
 54            # Prefer combined file if present, else gather per-chromosome files
 55            combined = p / "ch_all.tsv"
 56            combined_gz = p / "ch_all.tsv.gz"
 57            if combined.exists():
 58                files = [combined]
 59            elif combined_gz.exists():
 60                files = [combined_gz]
 61            else:
 62                files = sorted(list(p.glob("ch*.tsv")) + list(p.glob("ch*.tsv.gz")))
 63                if not files:
 64                    raise FileNotFoundError("No ancIBD output files found in directory.")
 65        else:
 66            files = [p]
 67
 68        frames = []
 69        schema_overrides = {
 70            "iid1": pl.Utf8,
 71            "iid2": pl.Utf8,
 72            "ch": pl.Utf8,
 73            "Start": pl.Int64,
 74            "End": pl.Int64,
 75            "length": pl.Int64,  # marker span; not used
 76            "StartM": pl.Float64,
 77            "EndM": pl.Float64,
 78            "lengthM": pl.Float64,
 79            "StartBP": pl.Int64,
 80            "EndBP": pl.Int64,
 81            "segment_type": pl.Utf8,
 82        }
 83
 84        for f in files:
 85            frame = pl.read_csv(str(f), separator="\t", has_header=True, schema_overrides=schema_overrides)
 86            frames.append(frame)
 87
 88        df = pl.concat(frames, how="vertical") if len(frames) > 1 else frames[0]
 89
 90        if include_segment_types is not None:
 91            df = df.filter(pl.col("segment_type").is_in(list(include_segment_types)))
 92
 93        # Map columns to IBDObject schema
 94        sample_id_1 = df["iid1"].to_numpy()
 95        sample_id_2 = df["iid2"].to_numpy()
 96        chrom = df["ch"].to_numpy()
 97        start_bp = df["StartBP"].to_numpy()
 98        end_bp = df["EndBP"].to_numpy()
 99        length_cm = (df["lengthM"] * 100.0).to_numpy()
100
101        # ancIBD doesn't include haplotype indices; set to -1
102        hap1 = np.full(sample_id_1.shape[0], -1, dtype=np.int8)
103        hap2 = np.full(sample_id_2.shape[0], -1, dtype=np.int8)
104
105        ibdobj = IBDObject(
106            sample_id_1=sample_id_1,
107            haplotype_id_1=hap1,
108            sample_id_2=sample_id_2,
109            haplotype_id_2=hap2,
110            chrom=chrom,
111            start=start_bp,
112            end=end_bp,
113            length_cm=length_cm,
114            segment_type=df["segment_type"].to_numpy(),
115        )
116
117        log.info(f"Finished reading ancIBD from {p}")
118        return ibdobj

Reads IBD data from ancIBD outputs (TSV), accepting a file (ch_all.tsv or ch*.tsv) or a directory.

def read( self, path: str | pathlib.Path | None = None, include_segment_types: Sequence[str] | None = ('IBD1', 'IBD2')) -> IBDObject:
 22    def read(
 23        self,
 24        path: Optional[Union[str, Path]] = None,
 25        include_segment_types: Optional[Sequence[str]] = ("IBD1", "IBD2"),
 26    ) -> IBDObject:
 27        """
 28        Read ancIBD outputs and convert to `IBDObject`.
 29
 30        Inputs accepted:
 31        - A single TSV (optionally gzipped), e.g. `ch_all.tsv[.gz]` or `ch{CHR}.tsv[.gz]`.
 32        - A directory containing per-chromosome TSVs or `ch_all.tsv`.
 33
 34        Column schema (tab-separated with header):
 35        iid1, iid2, ch, Start, End, length, StartM, EndM, lengthM, StartBP, EndBP, segment_type
 36
 37        Notes:
 38        - Haplotype indices are not provided by ancIBD; set to -1.
 39        - Positions in IBDObject use base-pair StartBP/EndBP.
 40        - Length uses centiMorgan as `lengthM * 100`.
 41
 42        Args:
 43            path (str or Path, optional): Override input path. Defaults to `self.file`.
 44            include_segment_types (sequence of str, optional): Filter by `segment_type` (e.g., IBD1, IBD2). None to disable.
 45
 46        Returns:
 47            **IBDObject**: An IBDObject instance.
 48        """
 49        p = Path(path) if path is not None else Path(self.file)
 50        log.info(f"Reading ancIBD from {p}")
 51
 52        files: list[Path]
 53        if p.is_dir():
 54            # Prefer combined file if present, else gather per-chromosome files
 55            combined = p / "ch_all.tsv"
 56            combined_gz = p / "ch_all.tsv.gz"
 57            if combined.exists():
 58                files = [combined]
 59            elif combined_gz.exists():
 60                files = [combined_gz]
 61            else:
 62                files = sorted(list(p.glob("ch*.tsv")) + list(p.glob("ch*.tsv.gz")))
 63                if not files:
 64                    raise FileNotFoundError("No ancIBD output files found in directory.")
 65        else:
 66            files = [p]
 67
 68        frames = []
 69        schema_overrides = {
 70            "iid1": pl.Utf8,
 71            "iid2": pl.Utf8,
 72            "ch": pl.Utf8,
 73            "Start": pl.Int64,
 74            "End": pl.Int64,
 75            "length": pl.Int64,  # marker span; not used
 76            "StartM": pl.Float64,
 77            "EndM": pl.Float64,
 78            "lengthM": pl.Float64,
 79            "StartBP": pl.Int64,
 80            "EndBP": pl.Int64,
 81            "segment_type": pl.Utf8,
 82        }
 83
 84        for f in files:
 85            frame = pl.read_csv(str(f), separator="\t", has_header=True, schema_overrides=schema_overrides)
 86            frames.append(frame)
 87
 88        df = pl.concat(frames, how="vertical") if len(frames) > 1 else frames[0]
 89
 90        if include_segment_types is not None:
 91            df = df.filter(pl.col("segment_type").is_in(list(include_segment_types)))
 92
 93        # Map columns to IBDObject schema
 94        sample_id_1 = df["iid1"].to_numpy()
 95        sample_id_2 = df["iid2"].to_numpy()
 96        chrom = df["ch"].to_numpy()
 97        start_bp = df["StartBP"].to_numpy()
 98        end_bp = df["EndBP"].to_numpy()
 99        length_cm = (df["lengthM"] * 100.0).to_numpy()
100
101        # ancIBD doesn't include haplotype indices; set to -1
102        hap1 = np.full(sample_id_1.shape[0], -1, dtype=np.int8)
103        hap2 = np.full(sample_id_2.shape[0], -1, dtype=np.int8)
104
105        ibdobj = IBDObject(
106            sample_id_1=sample_id_1,
107            haplotype_id_1=hap1,
108            sample_id_2=sample_id_2,
109            haplotype_id_2=hap2,
110            chrom=chrom,
111            start=start_bp,
112            end=end_bp,
113            length_cm=length_cm,
114            segment_type=df["segment_type"].to_numpy(),
115        )
116
117        log.info(f"Finished reading ancIBD from {p}")
118        return ibdobj

Read ancIBD outputs and convert to IBDObject.

Inputs accepted:

  • A single TSV (optionally gzipped), e.g. ch_all.tsv[.gz] or ch{CHR}.tsv[.gz].
  • A directory containing per-chromosome TSVs or ch_all.tsv.

Column schema (tab-separated with header): iid1, iid2, ch, Start, End, length, StartM, EndM, lengthM, StartBP, EndBP, segment_type

Notes:

  • Haplotype indices are not provided by ancIBD; set to -1.
  • Positions in IBDObject use base-pair StartBP/EndBP.
  • Length uses centiMorgan as lengthM * 100.
Arguments:
  • path (str or Path, optional): Override input path. Defaults to self.file.
  • include_segment_types (sequence of str, optional): Filter by segment_type (e.g., IBD1, IBD2). None to disable.
Returns:

IBDObject: An IBDObject instance.

class IBDReader:
 8class IBDReader:
 9    def __new__(
10        cls,
11        file: Union[str, Path]
12    ) -> object:
13        """
14        A factory class that attempts to detect the IBD file format and returns the corresponding reader.
15
16        Supported detections:
17        - Hap-IBD: *.ibd or *.ibd.gz (headerless, 8 columns)
18        - ancIBD: directories with `ch_all.tsv`/`ch*.tsv` or files *.tsv / *.tsv.gz with ancIBD schema
19        """
20        file = Path(file)
21        suffixes = [s.lower() for s in file.suffixes]
22
23        # Directory-based detection for ancIBD
24        if file.is_dir():
25            if (file / 'ch_all.tsv').exists() or (file / 'ch_all.tsv.gz').exists():
26                from snputils.ibd.io.read.anc_ibd import AncIBDReader
27                return AncIBDReader(file)
28            has_chr_files = list(file.glob('ch*.tsv')) or list(file.glob('ch*.tsv.gz'))
29            if has_chr_files:
30                from snputils.ibd.io.read.anc_ibd import AncIBDReader
31                return AncIBDReader(file)
32            # Fallback to HapIBD if nothing matches
33            from snputils.ibd.io.read.hap_ibd import HapIBDReader
34            return HapIBDReader(file)
35
36        # File-based detection
37        if suffixes[-2:] == ['.ibd', '.gz'] or suffixes[-1:] == ['.ibd']:
38            from snputils.ibd.io.read.hap_ibd import HapIBDReader
39            return HapIBDReader(file)
40        if suffixes[-2:] == ['.tsv', '.gz'] or suffixes[-1:] == ['.tsv']:
41            from snputils.ibd.io.read.anc_ibd import AncIBDReader
42            return AncIBDReader(file)
43
44        # Default to HapIBDReader (most tools use .ibd[.gz])
45        from snputils.ibd.io.read.hap_ibd import HapIBDReader
46        return HapIBDReader(file)
IBDReader(file: 'Union[str, Path]')
 9    def __new__(
10        cls,
11        file: Union[str, Path]
12    ) -> object:
13        """
14        A factory class that attempts to detect the IBD file format and returns the corresponding reader.
15
16        Supported detections:
17        - Hap-IBD: *.ibd or *.ibd.gz (headerless, 8 columns)
18        - ancIBD: directories with `ch_all.tsv`/`ch*.tsv` or files *.tsv / *.tsv.gz with ancIBD schema
19        """
20        file = Path(file)
21        suffixes = [s.lower() for s in file.suffixes]
22
23        # Directory-based detection for ancIBD
24        if file.is_dir():
25            if (file / 'ch_all.tsv').exists() or (file / 'ch_all.tsv.gz').exists():
26                from snputils.ibd.io.read.anc_ibd import AncIBDReader
27                return AncIBDReader(file)
28            has_chr_files = list(file.glob('ch*.tsv')) or list(file.glob('ch*.tsv.gz'))
29            if has_chr_files:
30                from snputils.ibd.io.read.anc_ibd import AncIBDReader
31                return AncIBDReader(file)
32            # Fallback to HapIBD if nothing matches
33            from snputils.ibd.io.read.hap_ibd import HapIBDReader
34            return HapIBDReader(file)
35
36        # File-based detection
37        if suffixes[-2:] == ['.ibd', '.gz'] or suffixes[-1:] == ['.ibd']:
38            from snputils.ibd.io.read.hap_ibd import HapIBDReader
39            return HapIBDReader(file)
40        if suffixes[-2:] == ['.tsv', '.gz'] or suffixes[-1:] == ['.tsv']:
41            from snputils.ibd.io.read.anc_ibd import AncIBDReader
42            return AncIBDReader(file)
43
44        # Default to HapIBDReader (most tools use .ibd[.gz])
45        from snputils.ibd.io.read.hap_ibd import HapIBDReader
46        return HapIBDReader(file)

A factory class that attempts to detect the IBD file format and returns the corresponding reader.

Supported detections:

  • Hap-IBD: *.ibd or *.ibd.gz (headerless, 8 columns)
  • ancIBD: directories with ch_all.tsv/ch*.tsv or files *.tsv / *.tsv.gz with ancIBD schema