snputils.snp.io

1from .read import SNPReader, BEDReader, PGENReader, VCFReader, read_snp, read_bed, read_pgen, read_vcf
2from .write import BEDWriter, PGENWriter, VCFWriter
3
4__all__ = ['read_snp', 'read_bed', 'read_pgen', 'read_vcf',
5           'SNPReader', 'BEDReader', 'PGENReader', 'VCFReader',
6           'BEDWriter', 'PGENWriter', 'VCFWriter']
def read_snp( filename: Union[str, pathlib._local.Path], **kwargs) -> snputils.snp.genobj.SNPObject:
 8def read_snp(filename: Union[str, pathlib.Path], **kwargs) -> SNPObject:
 9    """
10    Automatically detect the file format and read it into a SNPObject.
11
12    Args:
13        filename: Filename of the file to read.
14        **kwargs: Additional arguments passed to the reader method.
15
16    Raises:
17        ValueError: If the filename does not have an extension or the extension is not supported.
18    """
19    from snputils.snp.io.read.auto import SNPReader
20
21    return SNPReader(filename).read(**kwargs)

Automatically detect the file format and read it into a SNPObject.

Arguments:
  • filename: Filename of the file to read.
  • **kwargs: Additional arguments passed to the reader method.
Raises:
  • ValueError: If the filename does not have an extension or the extension is not supported.
def read_bed( filename: Union[str, pathlib._local.Path], **kwargs) -> snputils.snp.genobj.SNPObject:
24def read_bed(filename: Union[str, pathlib.Path], **kwargs) -> SNPObject:
25    """
26    Read a BED fileset into a SNPObject.
27
28    Args:
29        filename: Filename of the BED fileset to read.
30        **kwargs: Additional arguments passed to the reader method. See :class:`snputils.snp.io.read.bed.BEDReader` for possible parameters.
31    """
32    from snputils.snp.io.read.bed import BEDReader
33
34    return BEDReader(filename).read(**kwargs)

Read a BED fileset into a SNPObject.

Arguments:
  • filename: Filename of the BED fileset to read.
  • **kwargs: Additional arguments passed to the reader method. See snputils.snp.io.read.bed.BEDReader for possible parameters.
def read_pgen( filename: Union[str, pathlib._local.Path], **kwargs) -> snputils.snp.genobj.SNPObject:
37def read_pgen(filename: Union[str, pathlib.Path], **kwargs) -> SNPObject:
38    """
39    Read a PGEN fileset into a SNPObject.
40
41    Args:
42        filename: Filename of the PGEN fileset to read.
43        **kwargs: Additional arguments passed to the reader method. See :class:`snputils.snp.io.read.pgen.PGENReader` for possible parameters.
44    """
45    from snputils.snp.io.read.pgen import PGENReader
46
47    return PGENReader(filename).read(**kwargs)

Read a PGEN fileset into a SNPObject.

Arguments:
def read_vcf( filename: Union[str, pathlib._local.Path], backend: str = 'polars', **kwargs) -> snputils.snp.genobj.SNPObject:
50def read_vcf(filename: Union[str, pathlib.Path], 
51             backend: str = 'polars',
52             **kwargs) -> SNPObject:
53    """
54    Read a VCF fileset into a SNPObject.
55
56    Args:
57        filename: Filename of the VCF fileset to read.
58        backend: Backend to use for reading the VCF file. Options are 'polars' or 'scikit-allel'.
59        **kwargs: Additional arguments passed to the reader method. See :class:`snputils.snp.io.read.vcf.VCFReader` for possible parameters.
60    """
61    from snputils.snp.io.read.vcf import VCFReader, VCFReaderPolars
62    if backend == 'polars':
63        print(f"Reading {filename} with polars backend")
64        return VCFReaderPolars(filename).read(**kwargs)
65    else:
66        print(f"Reading {filename} with scikit-allel backend")
67        return VCFReader(filename).read(**kwargs)

Read a VCF fileset into a SNPObject.

Arguments:
  • filename: Filename of the VCF fileset to read.
  • backend: Backend to use for reading the VCF file. Options are 'polars' or 'scikit-allel'.
  • **kwargs: Additional arguments passed to the reader method. See snputils.snp.io.read.vcf.VCFReader for possible parameters.
class SNPReader:
 8class SNPReader:
 9    def __new__(cls,
10                filename: Union[str, pathlib.Path],
11                vcf_backend: str = 'polars') -> SNPReader:
12        """
13        Automatically detect the SNP file format from the file extension, and return its corresponding reader.
14
15        Args:
16            filename: Filename of the file to read.
17            vcf_backend: Backend to use for reading the VCF file. Options are 'polars' or 'scikit-allel'. Default is 'polars'.
18
19        Raises:
20            ValueError: If the filename does not have an extension or the extension is not supported.
21        """
22        filename = pathlib.Path(filename)
23        suffixes = filename.suffixes
24        if not suffixes:
25            raise ValueError("The filename should have an extension when using SNPReader.")
26
27        extension = suffixes[-2] if suffixes[-1].lower() in (".zst", ".gz") else suffixes[-1]
28        extension = extension.lower()
29
30        if extension == ".vcf":
31            if vcf_backend == 'polars':
32                from snputils.snp.io.read.vcf import VCFReaderPolars
33
34                return VCFReaderPolars(filename)
35            elif vcf_backend == 'scikit-allel':
36                from snputils.snp.io.read.vcf import VCFReader
37
38                return VCFReader(filename)
39            else:
40                raise ValueError(f"VCF backend not supported: {vcf_backend}")
41        elif extension in (".bed", ".bim", ".fam"):
42            from snputils.snp.io.read.bed import BEDReader
43
44            return BEDReader(filename)
45        elif extension in (".pgen", ".pvar", ".psam", ".pvar.zst"):
46            from snputils.snp.io.read.pgen import PGENReader
47
48            return PGENReader(filename)
49        else:
50            raise ValueError(f"File format not supported: {filename}")
SNPReader( filename: Union[str, pathlib._local.Path], vcf_backend: str = 'polars')
 9    def __new__(cls,
10                filename: Union[str, pathlib.Path],
11                vcf_backend: str = 'polars') -> SNPReader:
12        """
13        Automatically detect the SNP file format from the file extension, and return its corresponding reader.
14
15        Args:
16            filename: Filename of the file to read.
17            vcf_backend: Backend to use for reading the VCF file. Options are 'polars' or 'scikit-allel'. Default is 'polars'.
18
19        Raises:
20            ValueError: If the filename does not have an extension or the extension is not supported.
21        """
22        filename = pathlib.Path(filename)
23        suffixes = filename.suffixes
24        if not suffixes:
25            raise ValueError("The filename should have an extension when using SNPReader.")
26
27        extension = suffixes[-2] if suffixes[-1].lower() in (".zst", ".gz") else suffixes[-1]
28        extension = extension.lower()
29
30        if extension == ".vcf":
31            if vcf_backend == 'polars':
32                from snputils.snp.io.read.vcf import VCFReaderPolars
33
34                return VCFReaderPolars(filename)
35            elif vcf_backend == 'scikit-allel':
36                from snputils.snp.io.read.vcf import VCFReader
37
38                return VCFReader(filename)
39            else:
40                raise ValueError(f"VCF backend not supported: {vcf_backend}")
41        elif extension in (".bed", ".bim", ".fam"):
42            from snputils.snp.io.read.bed import BEDReader
43
44            return BEDReader(filename)
45        elif extension in (".pgen", ".pvar", ".psam", ".pvar.zst"):
46            from snputils.snp.io.read.pgen import PGENReader
47
48            return PGENReader(filename)
49        else:
50            raise ValueError(f"File format not supported: {filename}")

Automatically detect the SNP file format from the file extension, and return its corresponding reader.

Arguments:
  • filename: Filename of the file to read.
  • vcf_backend: Backend to use for reading the VCF file. Options are 'polars' or 'scikit-allel'. Default is 'polars'.
Raises:
  • ValueError: If the filename does not have an extension or the extension is not supported.
@SNPBaseReader.register
class BEDReader(snputils.snp.io.read.base.SNPBaseReader):
 16@SNPBaseReader.register
 17class BEDReader(SNPBaseReader):
 18    def read(
 19        self,
 20        fields: Optional[List[str]] = None,
 21        exclude_fields: Optional[List[str]] = None,
 22        sample_ids: Optional[np.ndarray] = None,
 23        sample_idxs: Optional[np.ndarray] = None,
 24        variant_ids: Optional[np.ndarray] = None,
 25        variant_idxs: Optional[np.ndarray] = None,
 26        sum_strands: bool = False,
 27        separator: Optional[str] = None,
 28    ) -> SNPObject:
 29        """
 30        Read a bed fileset (bed, bim, fam) into a SNPObject.
 31
 32        Args:
 33            fields (str, None, or list of str, optional): Fields to extract data for that should be included in the returned SNPObject.
 34                Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS'.
 35                To extract all fields, set fields to None. Defaults to None.
 36            exclude_fields (str, None, or list of str, optional): Fields to exclude from the returned SNPObject.
 37                Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS'.
 38                To exclude no fields, set exclude_fields to None. Defaults to None.
 39            sample_ids: List of sample IDs to read. If None and sample_idxs is None, all samples are read.
 40            sample_idxs: List of sample indices to read. If None and sample_ids is None, all samples are read.
 41            variant_ids: List of variant IDs to read. If None and variant_idxs is None, all variants are read.
 42            variant_idxs: List of variant indices to read. If None and variant_ids is None, all variants are read.
 43            sum_strands: If True, maternal and paternal strands are combined into a single `int8` array with values `{0, 1, 2`}. 
 44                If False, strands are stored separately as an `int8` array with values `{0, 1}` for each strand. 
 45                Note: With the pgenlib backend, `False` uses `~8×` more RAM, though `calldata_gt` is only `2×` larger.
 46            separator: Separator used in the pvar file. If None, the separator is automatically detected.
 47                If the automatic detection fails, please specify the separator manually.
 48
 49        Returns:
 50            **SNPObject**: 
 51                A SNPObject instance.
 52        """
 53        assert (
 54            sample_idxs is None or sample_ids is None
 55        ), "Only one of sample_idxs and sample_ids can be specified"
 56        assert (
 57            variant_idxs is None or variant_ids is None
 58        ), "Only one of variant_idxs and variant_ids can be specified"
 59
 60        if isinstance(fields, str):
 61            fields = [fields]
 62        if isinstance(exclude_fields, str):
 63            exclude_fields = [exclude_fields]
 64
 65        fields = fields or ["GT", "IID", "REF", "ALT", "#CHROM", "ID", "POS"]
 66        exclude_fields = exclude_fields or []
 67        fields = [field for field in fields if field not in exclude_fields]
 68        only_read_bed = fields == ["GT"] and variant_idxs is None and sample_idxs is None
 69
 70        filename_noext = str(self.filename)
 71        if filename_noext[-4:].lower() in (".bed", ".bim", ".fam"):
 72            filename_noext = filename_noext[:-4]
 73
 74        if only_read_bed:
 75            with open(filename_noext + '.fam', 'r') as f:
 76                file_num_samples = sum(1 for _ in f)  # Get sample count from fam file
 77            file_num_variants = None  # Not needed
 78        else:
 79            log.info(f"Reading {filename_noext}.bim")
 80
 81            if separator is None:
 82                with open(filename_noext + ".bim", "r") as file:
 83                    separator = csv.Sniffer().sniff(file.readline()).delimiter
 84
 85            bim = pl.read_csv(
 86                filename_noext + ".bim",
 87                separator=separator,
 88                has_header=False,
 89                new_columns=["#CHROM", "ID", "CM", "POS", "ALT", "REF"],
 90                schema_overrides={
 91                    "#CHROM": pl.String,
 92                    "ID": pl.String,
 93                    "CM": pl.Float64,
 94                    "POS": pl.Int64,
 95                    "ALT": pl.String,
 96                    "REF": pl.String
 97                },
 98                null_values=["NA"]
 99            ).with_row_index()
100            file_num_variants = bim.height
101
102            if variant_ids is not None:
103                variant_idxs = bim.filter(pl.col("ID").is_in(variant_ids)).select("index").to_series().to_numpy()
104
105            if variant_idxs is None:
106                num_variants = file_num_variants
107                variant_idxs = np.arange(num_variants, dtype=np.uint32)
108            else:
109                num_variants = np.size(variant_idxs)
110                variant_idxs = np.array(variant_idxs, dtype=np.uint32)
111                bim = bim.filter(pl.col("index").is_in(variant_idxs))
112
113            log.info(f"Reading {filename_noext}.fam")
114
115            fam = pl.read_csv(
116                filename_noext + ".fam",
117                separator=separator,
118                has_header=False,
119                new_columns=["Family ID", "IID", "Father ID",
120                             "Mother ID", "Sex code", "Phenotype value"],
121                schema_overrides={
122                    "Family ID": pl.String,
123                    "IID": pl.String,
124                    "Father ID": pl.String,
125                    "Mother ID": pl.String,
126                    "Sex code": pl.String,
127                },
128                null_values=["NA"]
129            ).with_row_index()
130            file_num_samples = fam.height
131
132            if sample_ids is not None:
133                sample_idxs = fam.filter(pl.col("IID").is_in(sample_ids)).select("index").to_series().to_numpy()
134
135            if sample_idxs is None:
136                num_samples = file_num_samples
137            else:
138                num_samples = np.size(sample_idxs)
139                sample_idxs = np.array(sample_idxs, dtype=np.uint32)
140                fam = fam.filter(pl.col("index").is_in(sample_idxs))
141
142        if "GT" in fields:
143            log.info(f"Reading {filename_noext}.bed")
144            pgen_reader = pg.PgenReader(
145                str.encode(filename_noext + ".bed"),
146                raw_sample_ct=file_num_samples,
147                variant_ct=file_num_variants,
148                sample_subset=sample_idxs,
149            )
150
151            if only_read_bed:
152                num_samples = pgen_reader.get_raw_sample_ct()
153                num_variants = pgen_reader.get_variant_ct()
154                variant_idxs = np.arange(num_variants, dtype=np.uint32)
155
156            # required arrays: variant_idxs + sample_idxs + genotypes
157            if not sum_strands:
158                required_ram = (num_samples + num_variants + num_variants * 2 * num_samples) * 4
159            else:
160                required_ram = (num_samples + num_variants) * 4 + num_variants * num_samples
161            log.info(f">{required_ram / 1024**3:.2f} GiB of RAM are required to process {num_samples} samples with {num_variants} variants each")
162
163            if not sum_strands:
164                genotypes = np.empty((num_variants, 2 * num_samples), dtype=np.int32)  # cannot use int8 because of pgenlib
165                pgen_reader.read_alleles_list(variant_idxs, genotypes)
166                genotypes = genotypes.astype(np.int8).reshape((num_variants, num_samples, 2))
167            else:
168                genotypes = np.empty((num_variants, num_samples), dtype=np.int8)
169                pgen_reader.read_list(variant_idxs, genotypes)
170            pgen_reader.close()
171        else:
172            genotypes = None
173
174        log.info("Constructing SNPObject")
175
176        snpobj = SNPObject(
177            calldata_gt=genotypes if "GT" in fields else None,
178            samples=fam.get_column("IID").to_numpy() if "IID" in fields and "IID" in fam.columns else None,
179            **{f'variants_{k.lower()}': bim.get_column(v).to_numpy() if v in fields and v in bim.columns else None
180               for k, v in {'ref': 'REF', 'alt': 'ALT', 'chrom': '#CHROM', 'id': 'ID', 'pos': 'POS'}.items()}
181        )
182
183        log.info("Finished constructing SNPObject")
184        return snpobj

Abstract class for SNP readers.

Attributes:
  • _filename: The path to the file storing SNP data.
def read( self, fields: Optional[List[str]] = None, exclude_fields: Optional[List[str]] = None, sample_ids: Optional[numpy.ndarray] = None, sample_idxs: Optional[numpy.ndarray] = None, variant_ids: Optional[numpy.ndarray] = None, variant_idxs: Optional[numpy.ndarray] = None, sum_strands: bool = False, separator: Optional[str] = None) -> snputils.snp.genobj.SNPObject:
 18    def read(
 19        self,
 20        fields: Optional[List[str]] = None,
 21        exclude_fields: Optional[List[str]] = None,
 22        sample_ids: Optional[np.ndarray] = None,
 23        sample_idxs: Optional[np.ndarray] = None,
 24        variant_ids: Optional[np.ndarray] = None,
 25        variant_idxs: Optional[np.ndarray] = None,
 26        sum_strands: bool = False,
 27        separator: Optional[str] = None,
 28    ) -> SNPObject:
 29        """
 30        Read a bed fileset (bed, bim, fam) into a SNPObject.
 31
 32        Args:
 33            fields (str, None, or list of str, optional): Fields to extract data for that should be included in the returned SNPObject.
 34                Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS'.
 35                To extract all fields, set fields to None. Defaults to None.
 36            exclude_fields (str, None, or list of str, optional): Fields to exclude from the returned SNPObject.
 37                Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS'.
 38                To exclude no fields, set exclude_fields to None. Defaults to None.
 39            sample_ids: List of sample IDs to read. If None and sample_idxs is None, all samples are read.
 40            sample_idxs: List of sample indices to read. If None and sample_ids is None, all samples are read.
 41            variant_ids: List of variant IDs to read. If None and variant_idxs is None, all variants are read.
 42            variant_idxs: List of variant indices to read. If None and variant_ids is None, all variants are read.
 43            sum_strands: If True, maternal and paternal strands are combined into a single `int8` array with values `{0, 1, 2`}. 
 44                If False, strands are stored separately as an `int8` array with values `{0, 1}` for each strand. 
 45                Note: With the pgenlib backend, `False` uses `~8×` more RAM, though `calldata_gt` is only `2×` larger.
 46            separator: Separator used in the pvar file. If None, the separator is automatically detected.
 47                If the automatic detection fails, please specify the separator manually.
 48
 49        Returns:
 50            **SNPObject**: 
 51                A SNPObject instance.
 52        """
 53        assert (
 54            sample_idxs is None or sample_ids is None
 55        ), "Only one of sample_idxs and sample_ids can be specified"
 56        assert (
 57            variant_idxs is None or variant_ids is None
 58        ), "Only one of variant_idxs and variant_ids can be specified"
 59
 60        if isinstance(fields, str):
 61            fields = [fields]
 62        if isinstance(exclude_fields, str):
 63            exclude_fields = [exclude_fields]
 64
 65        fields = fields or ["GT", "IID", "REF", "ALT", "#CHROM", "ID", "POS"]
 66        exclude_fields = exclude_fields or []
 67        fields = [field for field in fields if field not in exclude_fields]
 68        only_read_bed = fields == ["GT"] and variant_idxs is None and sample_idxs is None
 69
 70        filename_noext = str(self.filename)
 71        if filename_noext[-4:].lower() in (".bed", ".bim", ".fam"):
 72            filename_noext = filename_noext[:-4]
 73
 74        if only_read_bed:
 75            with open(filename_noext + '.fam', 'r') as f:
 76                file_num_samples = sum(1 for _ in f)  # Get sample count from fam file
 77            file_num_variants = None  # Not needed
 78        else:
 79            log.info(f"Reading {filename_noext}.bim")
 80
 81            if separator is None:
 82                with open(filename_noext + ".bim", "r") as file:
 83                    separator = csv.Sniffer().sniff(file.readline()).delimiter
 84
 85            bim = pl.read_csv(
 86                filename_noext + ".bim",
 87                separator=separator,
 88                has_header=False,
 89                new_columns=["#CHROM", "ID", "CM", "POS", "ALT", "REF"],
 90                schema_overrides={
 91                    "#CHROM": pl.String,
 92                    "ID": pl.String,
 93                    "CM": pl.Float64,
 94                    "POS": pl.Int64,
 95                    "ALT": pl.String,
 96                    "REF": pl.String
 97                },
 98                null_values=["NA"]
 99            ).with_row_index()
100            file_num_variants = bim.height
101
102            if variant_ids is not None:
103                variant_idxs = bim.filter(pl.col("ID").is_in(variant_ids)).select("index").to_series().to_numpy()
104
105            if variant_idxs is None:
106                num_variants = file_num_variants
107                variant_idxs = np.arange(num_variants, dtype=np.uint32)
108            else:
109                num_variants = np.size(variant_idxs)
110                variant_idxs = np.array(variant_idxs, dtype=np.uint32)
111                bim = bim.filter(pl.col("index").is_in(variant_idxs))
112
113            log.info(f"Reading {filename_noext}.fam")
114
115            fam = pl.read_csv(
116                filename_noext + ".fam",
117                separator=separator,
118                has_header=False,
119                new_columns=["Family ID", "IID", "Father ID",
120                             "Mother ID", "Sex code", "Phenotype value"],
121                schema_overrides={
122                    "Family ID": pl.String,
123                    "IID": pl.String,
124                    "Father ID": pl.String,
125                    "Mother ID": pl.String,
126                    "Sex code": pl.String,
127                },
128                null_values=["NA"]
129            ).with_row_index()
130            file_num_samples = fam.height
131
132            if sample_ids is not None:
133                sample_idxs = fam.filter(pl.col("IID").is_in(sample_ids)).select("index").to_series().to_numpy()
134
135            if sample_idxs is None:
136                num_samples = file_num_samples
137            else:
138                num_samples = np.size(sample_idxs)
139                sample_idxs = np.array(sample_idxs, dtype=np.uint32)
140                fam = fam.filter(pl.col("index").is_in(sample_idxs))
141
142        if "GT" in fields:
143            log.info(f"Reading {filename_noext}.bed")
144            pgen_reader = pg.PgenReader(
145                str.encode(filename_noext + ".bed"),
146                raw_sample_ct=file_num_samples,
147                variant_ct=file_num_variants,
148                sample_subset=sample_idxs,
149            )
150
151            if only_read_bed:
152                num_samples = pgen_reader.get_raw_sample_ct()
153                num_variants = pgen_reader.get_variant_ct()
154                variant_idxs = np.arange(num_variants, dtype=np.uint32)
155
156            # required arrays: variant_idxs + sample_idxs + genotypes
157            if not sum_strands:
158                required_ram = (num_samples + num_variants + num_variants * 2 * num_samples) * 4
159            else:
160                required_ram = (num_samples + num_variants) * 4 + num_variants * num_samples
161            log.info(f">{required_ram / 1024**3:.2f} GiB of RAM are required to process {num_samples} samples with {num_variants} variants each")
162
163            if not sum_strands:
164                genotypes = np.empty((num_variants, 2 * num_samples), dtype=np.int32)  # cannot use int8 because of pgenlib
165                pgen_reader.read_alleles_list(variant_idxs, genotypes)
166                genotypes = genotypes.astype(np.int8).reshape((num_variants, num_samples, 2))
167            else:
168                genotypes = np.empty((num_variants, num_samples), dtype=np.int8)
169                pgen_reader.read_list(variant_idxs, genotypes)
170            pgen_reader.close()
171        else:
172            genotypes = None
173
174        log.info("Constructing SNPObject")
175
176        snpobj = SNPObject(
177            calldata_gt=genotypes if "GT" in fields else None,
178            samples=fam.get_column("IID").to_numpy() if "IID" in fields and "IID" in fam.columns else None,
179            **{f'variants_{k.lower()}': bim.get_column(v).to_numpy() if v in fields and v in bim.columns else None
180               for k, v in {'ref': 'REF', 'alt': 'ALT', 'chrom': '#CHROM', 'id': 'ID', 'pos': 'POS'}.items()}
181        )
182
183        log.info("Finished constructing SNPObject")
184        return snpobj

Read a bed fileset (bed, bim, fam) into a SNPObject.

Arguments:
  • fields (str, None, or list of str, optional): Fields to extract data for that should be included in the returned SNPObject. Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS'. To extract all fields, set fields to None. Defaults to None.
  • exclude_fields (str, None, or list of str, optional): Fields to exclude from the returned SNPObject. Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS'. To exclude no fields, set exclude_fields to None. Defaults to None.
  • sample_ids: List of sample IDs to read. If None and sample_idxs is None, all samples are read.
  • sample_idxs: List of sample indices to read. If None and sample_ids is None, all samples are read.
  • variant_ids: List of variant IDs to read. If None and variant_idxs is None, all variants are read.
  • variant_idxs: List of variant indices to read. If None and variant_ids is None, all variants are read.
  • sum_strands: If True, maternal and paternal strands are combined into a single int8 array with values {0, 1, 2}. If False, strands are stored separately as an int8 array with values {0, 1} for each strand. Note: With the pgenlib backend, False uses ~8× more RAM, though calldata_gt is only 2× larger.
  • separator: Separator used in the pvar file. If None, the separator is automatically detected. If the automatic detection fails, please specify the separator manually.
Returns:

SNPObject: A SNPObject instance.

@SNPBaseReader.register
class PGENReader(snputils.snp.io.read.base.SNPBaseReader):
 17@SNPBaseReader.register
 18class PGENReader(SNPBaseReader):
 19    def read(
 20        self,
 21        fields: Optional[List[str]] = None,
 22        exclude_fields: Optional[List[str]] = None,
 23        sample_ids: Optional[np.ndarray] = None,
 24        sample_idxs: Optional[np.ndarray] = None,
 25        variant_ids: Optional[np.ndarray] = None,
 26        variant_idxs: Optional[np.ndarray] = None,
 27        sum_strands: bool = False,
 28        separator: str = None,
 29    ) -> SNPObject:
 30        """
 31        Read a pgen fileset (pgen, psam, pvar) into a SNPObject.
 32
 33        Args:
 34            fields (str, None, or list of str, optional): Fields to extract data for that should be included in the returned SNPObject.
 35                Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS', 'FILTER', 'QUAL'.
 36                To extract all fields, set fields to None. Defaults to None.
 37            exclude_fields (str, None, or list of str, optional): Fields to exclude from the returned SNPObject.
 38                Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS', 'FILTER', 'QUAL'.
 39                To exclude no fields, set exclude_fields to None. Defaults to None.
 40            sample_ids: List of sample IDs to read. If None and sample_idxs is None, all samples are read.
 41            sample_idxs: List of sample indices to read. If None and sample_ids is None, all samples are read.
 42            variant_ids: List of variant IDs to read. If None and variant_idxs is None, all variants are read.
 43            variant_idxs: List of variant indices to read. If None and variant_ids is None, all variants are read.
 44            sum_strands: If True, maternal and paternal strands are combined into a single `int8` array with values `{0, 1, 2`}. 
 45                If False, strands are stored separately as an `int8` array with values `{0, 1}` for each strand. 
 46                Note: With the pgenlib backend, `False` uses `~8×` more RAM, though `calldata_gt` is only `2×` larger.
 47            separator: Separator used in the pvar file. If None, the separator is automatically detected.
 48                If the automatic detection fails, please specify the separator manually.
 49
 50        Returns:
 51            **SNPObject**: 
 52                A SNPObject instance.
 53        """
 54        assert (
 55            sample_idxs is None or sample_ids is None
 56        ), "Only one of sample_idxs and sample_ids can be specified"
 57        assert (
 58            variant_idxs is None or variant_ids is None
 59        ), "Only one of variant_idxs and variant_ids can be specified"
 60
 61        if isinstance(fields, str):
 62            fields = [fields]
 63        if isinstance(exclude_fields, str):
 64            exclude_fields = [exclude_fields]
 65
 66        fields = fields or ["GT", "IID", "REF", "ALT", "#CHROM", "ID", "POS", "FILTER", "QUAL"]
 67        exclude_fields = exclude_fields or []
 68        fields = [field for field in fields if field not in exclude_fields]
 69        only_read_pgen = fields == ["GT"] and variant_idxs is None and sample_idxs is None
 70
 71        filename_noext = str(self.filename)
 72        for ext in [".pgen", ".pvar", ".pvar.zst", ".psam"]:
 73            if filename_noext.endswith(ext):
 74                filename_noext = filename_noext[:-len(ext)]
 75                break
 76
 77        if only_read_pgen:
 78            file_num_samples = None  # Not needed for pgen
 79            file_num_variants = None  # Not needed
 80        else:
 81            pvar_extensions = [".pvar", ".pvar.zst"]
 82            pvar_filename = None
 83            for ext in pvar_extensions:
 84                possible_pvar = filename_noext + ext
 85                if os.path.exists(possible_pvar):
 86                    pvar_filename = possible_pvar
 87                    break
 88            if pvar_filename is None:
 89                raise FileNotFoundError(f"No .pvar or .pvar.zst file found for {filename_noext}")
 90
 91            log.info(f"Reading {pvar_filename}")
 92
 93            def open_textfile(filename):
 94                if filename.endswith('.zst'):
 95                    import zstandard as zstd
 96                    return zstd.open(filename, 'rt')
 97                else:
 98                    return open(filename, 'rt')
 99
100            pvar_has_header = True
101            pvar_header_line_num = 0
102            with open_textfile(pvar_filename) as file:
103                for line_num, line in enumerate(file):
104                    if line.startswith("##"):  # Metadata
105                        continue
106                    else:
107                        if separator is None:
108                            separator = csv.Sniffer().sniff(file.readline()).delimiter
109                        if line.startswith("#CHROM"):  # Header
110                            pvar_header_line_num = line_num
111                            header = line.strip().split()
112                            break
113                        elif not line.startswith("#"):  # If no header, look at line 1
114                            pvar_has_header = False
115                            cols_in_pvar = len(line.strip().split(separator))
116                            if cols_in_pvar == 5:
117                                header = ["#CHROM", "ID", "POS", "ALT", "REF"]
118                            elif cols_in_pvar == 6:
119                                header = ["#CHROM", "ID", "CM", "POS", "ALT", "REF"]
120                            else:
121                                raise ValueError(
122                                    f"{pvar_filename} is not a valid pvar file."
123                                )
124                            break
125
126            pvar_reading_args = {
127                'separator': separator,
128                'skip_rows': pvar_header_line_num,
129                'has_header': pvar_has_header,
130                'new_columns': None if pvar_has_header else header,
131                'schema_overrides': {
132                    "#CHROM": pl.String,
133                    "POS": pl.UInt32,
134                    "ID": pl.String,
135                    "REF": pl.String,
136                    "ALT": pl.String,
137                },
138                'null_values': ["NA"],
139            }
140            if pvar_filename.endswith('.zst'):
141                pvar = pl.read_csv(pvar_filename, **pvar_reading_args).lazy()
142            else:
143                pvar = pl.scan_csv(pvar_filename, **pvar_reading_args)
144
145            # We need to track row positions of variants but doing this with lazy operations would
146            # defeat predicate pushdown optimization and force loading the entire DataFrame.
147            # Solution: We only materialize the ID column with indices, keeping memory usage minimal
148            # while maintaining the ability to map between variant IDs and their file positions.
149            variant_id_idxs = pvar.select("ID").with_row_index().collect()
150
151            file_num_variants = variant_id_idxs.height
152
153            if variant_ids is not None:
154                num_variants = np.size(variant_ids)
155                pvar = pvar.filter(pl.col("ID").is_in(variant_ids)).collect()
156                variant_idxs = variant_id_idxs.filter(pl.col("ID").is_in(variant_ids)).select("index").to_series().to_numpy()
157            elif variant_idxs is not None:
158                num_variants = np.size(variant_idxs)
159                variant_idxs = np.array(variant_idxs, dtype=np.uint32)
160                variant_ids = variant_id_idxs.filter(pl.col("index").is_in(variant_idxs)).select("ID")
161                pvar = pvar.filter(pl.col("ID").is_in(variant_ids)).collect()
162            else:
163                num_variants = file_num_variants
164                variant_idxs = np.arange(num_variants, dtype=np.uint32)
165                pvar = pvar.collect()
166
167            log.info(f"Reading {filename_noext}.psam")
168
169            with open(filename_noext + ".psam") as file:
170                first_line = file.readline().strip()
171                psam_has_header = first_line.startswith(("#FID", "FID", "#IID", "IID"))
172
173            psam = pl.read_csv(
174                filename_noext + ".psam",
175                separator=separator,
176                has_header=psam_has_header,
177                new_columns=None if psam_has_header else ["FID", "IID", "PAT", "MAT", "SEX", "PHENO1"],
178                null_values=["NA"],
179            ).with_row_index()
180            if "#IID" in psam.columns:
181                psam = psam.rename({"#IID": "IID"})
182            if "#FID" in psam.columns:
183                psam = psam.rename({"#FID": "FID"})
184
185            file_num_samples = psam.height
186
187            if sample_ids is not None:
188                num_samples = np.size(sample_ids)
189                psam = psam.filter(pl.col("IID").is_in(sample_ids))
190                sample_idxs = psam.select("index").to_series().to_numpy()
191            elif sample_idxs is not None:
192                num_samples = np.size(sample_idxs)
193                sample_idxs = np.array(sample_idxs, dtype=np.uint32)
194                psam = psam.filter(pl.col("index").is_in(sample_idxs))
195            else:
196                num_samples = file_num_samples
197
198        if "GT" in fields:
199            log.info(f"Reading {filename_noext}.pgen")
200            pgen_reader = pg.PgenReader(
201                str.encode(filename_noext + ".pgen"),
202                raw_sample_ct=file_num_samples,
203                variant_ct=file_num_variants,
204                sample_subset=sample_idxs,
205            )
206
207            if only_read_pgen:
208                num_samples = pgen_reader.get_raw_sample_ct()
209                num_variants = pgen_reader.get_variant_ct()
210                variant_idxs = np.arange(num_variants, dtype=np.uint32)
211
212            # required arrays: variant_idxs + sample_idxs + genotypes
213            if not sum_strands:
214                required_ram = (num_samples + num_variants + num_variants * 2 * num_samples) * 4
215            else:
216                required_ram = (num_samples + num_variants) * 4 + num_variants * num_samples
217            log.info(f">{required_ram / 1024**3:.2f} GiB of RAM are required to process {num_samples} samples with {num_variants} variants each")
218
219            if not sum_strands:
220                genotypes = np.empty((num_variants, 2 * num_samples), dtype=np.int32)  # cannot use int8 because of pgenlib
221                pgen_reader.read_alleles_list(variant_idxs, genotypes)
222                genotypes = genotypes.astype(np.int8).reshape((num_variants, num_samples, 2))
223            else:
224                genotypes = np.empty((num_variants, num_samples), dtype=np.int8)
225                pgen_reader.read_list(variant_idxs, genotypes)
226            pgen_reader.close()
227        else:
228            genotypes = None
229
230        log.info("Constructing SNPObject")
231
232        snpobj = SNPObject(
233            calldata_gt=genotypes if "GT" in fields else None,
234            samples=psam.get_column("IID").to_numpy() if "IID" in fields and "IID" in psam.columns else None,
235            **{f'variants_{k.lower()}': pvar.get_column(v).to_numpy() if v in fields and v in pvar.columns else None
236               for k, v in {'ref': 'REF', 'alt': 'ALT', 'chrom': '#CHROM', 'id': 'ID', 'pos': 'POS', 'filter_pass': 'FILTER', 'qual': 'QUAL'}.items()}
237        )
238
239        log.info("Finished constructing SNPObject")
240        return snpobj

Abstract class for SNP readers.

Attributes:
  • _filename: The path to the file storing SNP data.
def read( self, fields: Optional[List[str]] = None, exclude_fields: Optional[List[str]] = None, sample_ids: Optional[numpy.ndarray] = None, sample_idxs: Optional[numpy.ndarray] = None, variant_ids: Optional[numpy.ndarray] = None, variant_idxs: Optional[numpy.ndarray] = None, sum_strands: bool = False, separator: str = None) -> snputils.snp.genobj.SNPObject:
 19    def read(
 20        self,
 21        fields: Optional[List[str]] = None,
 22        exclude_fields: Optional[List[str]] = None,
 23        sample_ids: Optional[np.ndarray] = None,
 24        sample_idxs: Optional[np.ndarray] = None,
 25        variant_ids: Optional[np.ndarray] = None,
 26        variant_idxs: Optional[np.ndarray] = None,
 27        sum_strands: bool = False,
 28        separator: str = None,
 29    ) -> SNPObject:
 30        """
 31        Read a pgen fileset (pgen, psam, pvar) into a SNPObject.
 32
 33        Args:
 34            fields (str, None, or list of str, optional): Fields to extract data for that should be included in the returned SNPObject.
 35                Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS', 'FILTER', 'QUAL'.
 36                To extract all fields, set fields to None. Defaults to None.
 37            exclude_fields (str, None, or list of str, optional): Fields to exclude from the returned SNPObject.
 38                Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS', 'FILTER', 'QUAL'.
 39                To exclude no fields, set exclude_fields to None. Defaults to None.
 40            sample_ids: List of sample IDs to read. If None and sample_idxs is None, all samples are read.
 41            sample_idxs: List of sample indices to read. If None and sample_ids is None, all samples are read.
 42            variant_ids: List of variant IDs to read. If None and variant_idxs is None, all variants are read.
 43            variant_idxs: List of variant indices to read. If None and variant_ids is None, all variants are read.
 44            sum_strands: If True, maternal and paternal strands are combined into a single `int8` array with values `{0, 1, 2`}. 
 45                If False, strands are stored separately as an `int8` array with values `{0, 1}` for each strand. 
 46                Note: With the pgenlib backend, `False` uses `~8×` more RAM, though `calldata_gt` is only `2×` larger.
 47            separator: Separator used in the pvar file. If None, the separator is automatically detected.
 48                If the automatic detection fails, please specify the separator manually.
 49
 50        Returns:
 51            **SNPObject**: 
 52                A SNPObject instance.
 53        """
 54        assert (
 55            sample_idxs is None or sample_ids is None
 56        ), "Only one of sample_idxs and sample_ids can be specified"
 57        assert (
 58            variant_idxs is None or variant_ids is None
 59        ), "Only one of variant_idxs and variant_ids can be specified"
 60
 61        if isinstance(fields, str):
 62            fields = [fields]
 63        if isinstance(exclude_fields, str):
 64            exclude_fields = [exclude_fields]
 65
 66        fields = fields or ["GT", "IID", "REF", "ALT", "#CHROM", "ID", "POS", "FILTER", "QUAL"]
 67        exclude_fields = exclude_fields or []
 68        fields = [field for field in fields if field not in exclude_fields]
 69        only_read_pgen = fields == ["GT"] and variant_idxs is None and sample_idxs is None
 70
 71        filename_noext = str(self.filename)
 72        for ext in [".pgen", ".pvar", ".pvar.zst", ".psam"]:
 73            if filename_noext.endswith(ext):
 74                filename_noext = filename_noext[:-len(ext)]
 75                break
 76
 77        if only_read_pgen:
 78            file_num_samples = None  # Not needed for pgen
 79            file_num_variants = None  # Not needed
 80        else:
 81            pvar_extensions = [".pvar", ".pvar.zst"]
 82            pvar_filename = None
 83            for ext in pvar_extensions:
 84                possible_pvar = filename_noext + ext
 85                if os.path.exists(possible_pvar):
 86                    pvar_filename = possible_pvar
 87                    break
 88            if pvar_filename is None:
 89                raise FileNotFoundError(f"No .pvar or .pvar.zst file found for {filename_noext}")
 90
 91            log.info(f"Reading {pvar_filename}")
 92
 93            def open_textfile(filename):
 94                if filename.endswith('.zst'):
 95                    import zstandard as zstd
 96                    return zstd.open(filename, 'rt')
 97                else:
 98                    return open(filename, 'rt')
 99
100            pvar_has_header = True
101            pvar_header_line_num = 0
102            with open_textfile(pvar_filename) as file:
103                for line_num, line in enumerate(file):
104                    if line.startswith("##"):  # Metadata
105                        continue
106                    else:
107                        if separator is None:
108                            separator = csv.Sniffer().sniff(file.readline()).delimiter
109                        if line.startswith("#CHROM"):  # Header
110                            pvar_header_line_num = line_num
111                            header = line.strip().split()
112                            break
113                        elif not line.startswith("#"):  # If no header, look at line 1
114                            pvar_has_header = False
115                            cols_in_pvar = len(line.strip().split(separator))
116                            if cols_in_pvar == 5:
117                                header = ["#CHROM", "ID", "POS", "ALT", "REF"]
118                            elif cols_in_pvar == 6:
119                                header = ["#CHROM", "ID", "CM", "POS", "ALT", "REF"]
120                            else:
121                                raise ValueError(
122                                    f"{pvar_filename} is not a valid pvar file."
123                                )
124                            break
125
126            pvar_reading_args = {
127                'separator': separator,
128                'skip_rows': pvar_header_line_num,
129                'has_header': pvar_has_header,
130                'new_columns': None if pvar_has_header else header,
131                'schema_overrides': {
132                    "#CHROM": pl.String,
133                    "POS": pl.UInt32,
134                    "ID": pl.String,
135                    "REF": pl.String,
136                    "ALT": pl.String,
137                },
138                'null_values': ["NA"],
139            }
140            if pvar_filename.endswith('.zst'):
141                pvar = pl.read_csv(pvar_filename, **pvar_reading_args).lazy()
142            else:
143                pvar = pl.scan_csv(pvar_filename, **pvar_reading_args)
144
145            # We need to track row positions of variants but doing this with lazy operations would
146            # defeat predicate pushdown optimization and force loading the entire DataFrame.
147            # Solution: We only materialize the ID column with indices, keeping memory usage minimal
148            # while maintaining the ability to map between variant IDs and their file positions.
149            variant_id_idxs = pvar.select("ID").with_row_index().collect()
150
151            file_num_variants = variant_id_idxs.height
152
153            if variant_ids is not None:
154                num_variants = np.size(variant_ids)
155                pvar = pvar.filter(pl.col("ID").is_in(variant_ids)).collect()
156                variant_idxs = variant_id_idxs.filter(pl.col("ID").is_in(variant_ids)).select("index").to_series().to_numpy()
157            elif variant_idxs is not None:
158                num_variants = np.size(variant_idxs)
159                variant_idxs = np.array(variant_idxs, dtype=np.uint32)
160                variant_ids = variant_id_idxs.filter(pl.col("index").is_in(variant_idxs)).select("ID")
161                pvar = pvar.filter(pl.col("ID").is_in(variant_ids)).collect()
162            else:
163                num_variants = file_num_variants
164                variant_idxs = np.arange(num_variants, dtype=np.uint32)
165                pvar = pvar.collect()
166
167            log.info(f"Reading {filename_noext}.psam")
168
169            with open(filename_noext + ".psam") as file:
170                first_line = file.readline().strip()
171                psam_has_header = first_line.startswith(("#FID", "FID", "#IID", "IID"))
172
173            psam = pl.read_csv(
174                filename_noext + ".psam",
175                separator=separator,
176                has_header=psam_has_header,
177                new_columns=None if psam_has_header else ["FID", "IID", "PAT", "MAT", "SEX", "PHENO1"],
178                null_values=["NA"],
179            ).with_row_index()
180            if "#IID" in psam.columns:
181                psam = psam.rename({"#IID": "IID"})
182            if "#FID" in psam.columns:
183                psam = psam.rename({"#FID": "FID"})
184
185            file_num_samples = psam.height
186
187            if sample_ids is not None:
188                num_samples = np.size(sample_ids)
189                psam = psam.filter(pl.col("IID").is_in(sample_ids))
190                sample_idxs = psam.select("index").to_series().to_numpy()
191            elif sample_idxs is not None:
192                num_samples = np.size(sample_idxs)
193                sample_idxs = np.array(sample_idxs, dtype=np.uint32)
194                psam = psam.filter(pl.col("index").is_in(sample_idxs))
195            else:
196                num_samples = file_num_samples
197
198        if "GT" in fields:
199            log.info(f"Reading {filename_noext}.pgen")
200            pgen_reader = pg.PgenReader(
201                str.encode(filename_noext + ".pgen"),
202                raw_sample_ct=file_num_samples,
203                variant_ct=file_num_variants,
204                sample_subset=sample_idxs,
205            )
206
207            if only_read_pgen:
208                num_samples = pgen_reader.get_raw_sample_ct()
209                num_variants = pgen_reader.get_variant_ct()
210                variant_idxs = np.arange(num_variants, dtype=np.uint32)
211
212            # required arrays: variant_idxs + sample_idxs + genotypes
213            if not sum_strands:
214                required_ram = (num_samples + num_variants + num_variants * 2 * num_samples) * 4
215            else:
216                required_ram = (num_samples + num_variants) * 4 + num_variants * num_samples
217            log.info(f">{required_ram / 1024**3:.2f} GiB of RAM are required to process {num_samples} samples with {num_variants} variants each")
218
219            if not sum_strands:
220                genotypes = np.empty((num_variants, 2 * num_samples), dtype=np.int32)  # cannot use int8 because of pgenlib
221                pgen_reader.read_alleles_list(variant_idxs, genotypes)
222                genotypes = genotypes.astype(np.int8).reshape((num_variants, num_samples, 2))
223            else:
224                genotypes = np.empty((num_variants, num_samples), dtype=np.int8)
225                pgen_reader.read_list(variant_idxs, genotypes)
226            pgen_reader.close()
227        else:
228            genotypes = None
229
230        log.info("Constructing SNPObject")
231
232        snpobj = SNPObject(
233            calldata_gt=genotypes if "GT" in fields else None,
234            samples=psam.get_column("IID").to_numpy() if "IID" in fields and "IID" in psam.columns else None,
235            **{f'variants_{k.lower()}': pvar.get_column(v).to_numpy() if v in fields and v in pvar.columns else None
236               for k, v in {'ref': 'REF', 'alt': 'ALT', 'chrom': '#CHROM', 'id': 'ID', 'pos': 'POS', 'filter_pass': 'FILTER', 'qual': 'QUAL'}.items()}
237        )
238
239        log.info("Finished constructing SNPObject")
240        return snpobj

Read a pgen fileset (pgen, psam, pvar) into a SNPObject.

Arguments:
  • fields (str, None, or list of str, optional): Fields to extract data for that should be included in the returned SNPObject. Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS', 'FILTER', 'QUAL'. To extract all fields, set fields to None. Defaults to None.
  • exclude_fields (str, None, or list of str, optional): Fields to exclude from the returned SNPObject. Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS', 'FILTER', 'QUAL'. To exclude no fields, set exclude_fields to None. Defaults to None.
  • sample_ids: List of sample IDs to read. If None and sample_idxs is None, all samples are read.
  • sample_idxs: List of sample indices to read. If None and sample_ids is None, all samples are read.
  • variant_ids: List of variant IDs to read. If None and variant_idxs is None, all variants are read.
  • variant_idxs: List of variant indices to read. If None and variant_ids is None, all variants are read.
  • sum_strands: If True, maternal and paternal strands are combined into a single int8 array with values {0, 1, 2}. If False, strands are stored separately as an int8 array with values {0, 1} for each strand. Note: With the pgenlib backend, False uses ~8× more RAM, though calldata_gt is only 2× larger.
  • separator: Separator used in the pvar file. If None, the separator is automatically detected. If the automatic detection fails, please specify the separator manually.
Returns:

SNPObject: A SNPObject instance.

@SNPBaseReader.register
class VCFReader(snputils.snp.io.read.base.SNPBaseReader):
18@SNPBaseReader.register
19class VCFReader(SNPBaseReader):
20    def read(
21        self,
22        fields: Optional[List[str]] = None,
23        exclude_fields: Optional[List[str]] = None,
24        rename_fields: Optional[dict] = None,
25        fills: Optional[dict] = None,
26        region: Optional[str] = None,
27        samples: Optional[List[str]] = None,
28        sum_strands: bool = False,
29    ) -> SNPObject:
30        """
31        Read a vcf file into a SNPObject.
32
33        Args:
34            fields: Fields to extract data for. e.g., ['variants/CHROM', 'variants/POS',
35                'calldata/GT']. If you are feeling lazy, you can drop the 'variants/'
36                and 'calldata/' prefixes, in which case the fields will be matched
37                against fields declared in the VCF header, with variants taking priority
38                over calldata if a field with the same ID exists both in INFO and FORMAT
39                headers. I.e., ['CHROM', 'POS', 'DP', 'GT'] will work, although watch out
40                for fields like 'DP' which can be both INFO and FORMAT. To extract all
41                fields, provide just the string '*'. To extract all variants fields
42                (including all INFO fields) provide 'variants/*'. To extract all
43                calldata fields (i.e., defined in FORMAT headers) provide 'calldata/*'.
44            exclude_fields: Fields to exclude. E.g., for use in combination with fields='*'.
45            rename_fields: Fields to be renamed. Should be a dictionary mapping old to new names.
46            fills: Override the fill value used for empty values. Should be a dictionary
47                mapping field names to fill values.
48            region: Genomic region to extract variants for. If provided, should be a
49                tabix-style region string, which can be either just a chromosome name
50                (e.g., '2L'), or a chromosome name followed by 1-based beginning and
51                end coordinates (e.g., '2L:100000-200000'). Note that only variants
52                whose start position (POS) is within the requested range will be included.
53                This is slightly different from the default tabix behaviour, where a
54                variant (e.g., deletion) may be included if its position (POS) occurs
55                before the requested region but its reference allele overlaps the
56                region - such a variant will not be included in the data returned
57                by this function.
58            samples: Selection of samples to extract calldata for. If provided, should be
59                a list of strings giving sample identifiers. May also be a list of
60                integers giving indices of selected samples.
61            sum_strands: If True, maternal and paternal strands are combined into a single `int8` array with values `{0, 1, 2`}. 
62                If False, strands are stored separately as an `int8` array with values `{0, 1}` for each strand.
63
64        Returns:
65            **SNPObject**: 
66                A SNPObject instance.
67        """
68        log.info(f"Reading {self.filename}")
69
70        vcf_dict = allel.read_vcf(
71            str(self.filename),
72            fields=fields,
73            exclude_fields=exclude_fields,
74            rename_fields=rename_fields,
75            fills=fills,
76            region=region,
77            samples=samples,
78            alt_number=1,
79        )
80        assert vcf_dict is not None  # suppress Flake8 warning
81
82        genotypes = vcf_dict["calldata/GT"].astype(np.int8)
83        if sum_strands:
84            genotypes = genotypes.sum(axis=2, dtype=np.int8)
85
86        snpobj = SNPObject(
87            calldata_gt=genotypes,
88            samples=vcf_dict["samples"],
89            variants_ref=vcf_dict["variants/REF"],
90            variants_alt=vcf_dict["variants/ALT"],
91            variants_chrom=vcf_dict["variants/CHROM"],
92            variants_filter_pass=vcf_dict["variants/FILTER_PASS"],
93            variants_id=vcf_dict["variants/ID"],
94            variants_pos=vcf_dict["variants/POS"],
95            variants_qual=vcf_dict["variants/QUAL"],
96        )
97
98        log.info(f"Finished reading {self.filename}")
99        return snpobj

Abstract class for SNP readers.

Attributes:
  • _filename: The path to the file storing SNP data.
def read( self, fields: Optional[List[str]] = None, exclude_fields: Optional[List[str]] = None, rename_fields: Optional[dict] = None, fills: Optional[dict] = None, region: Optional[str] = None, samples: Optional[List[str]] = None, sum_strands: bool = False) -> snputils.snp.genobj.SNPObject:
20    def read(
21        self,
22        fields: Optional[List[str]] = None,
23        exclude_fields: Optional[List[str]] = None,
24        rename_fields: Optional[dict] = None,
25        fills: Optional[dict] = None,
26        region: Optional[str] = None,
27        samples: Optional[List[str]] = None,
28        sum_strands: bool = False,
29    ) -> SNPObject:
30        """
31        Read a vcf file into a SNPObject.
32
33        Args:
34            fields: Fields to extract data for. e.g., ['variants/CHROM', 'variants/POS',
35                'calldata/GT']. If you are feeling lazy, you can drop the 'variants/'
36                and 'calldata/' prefixes, in which case the fields will be matched
37                against fields declared in the VCF header, with variants taking priority
38                over calldata if a field with the same ID exists both in INFO and FORMAT
39                headers. I.e., ['CHROM', 'POS', 'DP', 'GT'] will work, although watch out
40                for fields like 'DP' which can be both INFO and FORMAT. To extract all
41                fields, provide just the string '*'. To extract all variants fields
42                (including all INFO fields) provide 'variants/*'. To extract all
43                calldata fields (i.e., defined in FORMAT headers) provide 'calldata/*'.
44            exclude_fields: Fields to exclude. E.g., for use in combination with fields='*'.
45            rename_fields: Fields to be renamed. Should be a dictionary mapping old to new names.
46            fills: Override the fill value used for empty values. Should be a dictionary
47                mapping field names to fill values.
48            region: Genomic region to extract variants for. If provided, should be a
49                tabix-style region string, which can be either just a chromosome name
50                (e.g., '2L'), or a chromosome name followed by 1-based beginning and
51                end coordinates (e.g., '2L:100000-200000'). Note that only variants
52                whose start position (POS) is within the requested range will be included.
53                This is slightly different from the default tabix behaviour, where a
54                variant (e.g., deletion) may be included if its position (POS) occurs
55                before the requested region but its reference allele overlaps the
56                region - such a variant will not be included in the data returned
57                by this function.
58            samples: Selection of samples to extract calldata for. If provided, should be
59                a list of strings giving sample identifiers. May also be a list of
60                integers giving indices of selected samples.
61            sum_strands: If True, maternal and paternal strands are combined into a single `int8` array with values `{0, 1, 2`}. 
62                If False, strands are stored separately as an `int8` array with values `{0, 1}` for each strand.
63
64        Returns:
65            **SNPObject**: 
66                A SNPObject instance.
67        """
68        log.info(f"Reading {self.filename}")
69
70        vcf_dict = allel.read_vcf(
71            str(self.filename),
72            fields=fields,
73            exclude_fields=exclude_fields,
74            rename_fields=rename_fields,
75            fills=fills,
76            region=region,
77            samples=samples,
78            alt_number=1,
79        )
80        assert vcf_dict is not None  # suppress Flake8 warning
81
82        genotypes = vcf_dict["calldata/GT"].astype(np.int8)
83        if sum_strands:
84            genotypes = genotypes.sum(axis=2, dtype=np.int8)
85
86        snpobj = SNPObject(
87            calldata_gt=genotypes,
88            samples=vcf_dict["samples"],
89            variants_ref=vcf_dict["variants/REF"],
90            variants_alt=vcf_dict["variants/ALT"],
91            variants_chrom=vcf_dict["variants/CHROM"],
92            variants_filter_pass=vcf_dict["variants/FILTER_PASS"],
93            variants_id=vcf_dict["variants/ID"],
94            variants_pos=vcf_dict["variants/POS"],
95            variants_qual=vcf_dict["variants/QUAL"],
96        )
97
98        log.info(f"Finished reading {self.filename}")
99        return snpobj

Read a vcf file into a SNPObject.

Arguments:
  • fields: Fields to extract data for. e.g., ['variants/CHROM', 'variants/POS', 'calldata/GT']. If you are feeling lazy, you can drop the 'variants/' and 'calldata/' prefixes, in which case the fields will be matched against fields declared in the VCF header, with variants taking priority over calldata if a field with the same ID exists both in INFO and FORMAT headers. I.e., ['CHROM', 'POS', 'DP', 'GT'] will work, although watch out for fields like 'DP' which can be both INFO and FORMAT. To extract all fields, provide just the string ''. To extract all variants fields (including all INFO fields) provide 'variants/'. To extract all calldata fields (i.e., defined in FORMAT headers) provide 'calldata/*'.
  • exclude_fields: Fields to exclude. E.g., for use in combination with fields='*'.
  • rename_fields: Fields to be renamed. Should be a dictionary mapping old to new names.
  • fills: Override the fill value used for empty values. Should be a dictionary mapping field names to fill values.
  • region: Genomic region to extract variants for. If provided, should be a tabix-style region string, which can be either just a chromosome name (e.g., '2L'), or a chromosome name followed by 1-based beginning and end coordinates (e.g., '2L:100000-200000'). Note that only variants whose start position (POS) is within the requested range will be included. This is slightly different from the default tabix behaviour, where a variant (e.g., deletion) may be included if its position (POS) occurs before the requested region but its reference allele overlaps the region - such a variant will not be included in the data returned by this function.
  • samples: Selection of samples to extract calldata for. If provided, should be a list of strings giving sample identifiers. May also be a list of integers giving indices of selected samples.
  • sum_strands: If True, maternal and paternal strands are combined into a single int8 array with values {0, 1, 2}. If False, strands are stored separately as an int8 array with values {0, 1} for each strand.
Returns:

SNPObject: A SNPObject instance.

class BEDWriter:
 14class BEDWriter:
 15    """Writes an object in bed/bim/fam formats in the specified output path.
 16
 17    Args:
 18        snpobj: The SNPObject to be written.
 19        file: The output file path.
 20
 21    """
 22
 23    def __init__(self, snpobj: SNPObject, filename: str):
 24        self.__snpobj = snpobj.copy()
 25        self.__filename = Path(filename)
 26
 27    def write(
 28            self,
 29            rename_missing_values: bool = True, 
 30            before: Union[int, float, str] = -1, 
 31            after: Union[int, float, str] = '.'
 32        ):
 33        """
 34        Writes the SNPObject to bed/bim/fam formats.
 35
 36        Args:
 37            rename_missing_values (bool, optional):
 38                If True, renames potential missing values in `snpobj.calldata_gt` before writing. 
 39                Defaults to True.
 40            before (int, float, or str, default=-1): 
 41                The current representation of missing values in `calldata_gt`. Common values might be -1, '.', or NaN.
 42                Default is -1.
 43            after (int, float, or str, default='.'): 
 44                The value that will replace `before`. Default is '.'.
 45        """
 46        # Save .bed file
 47        if self.__filename.suffix != '.bed':
 48            self.__filename = self.__filename.with_suffix('.bed')
 49
 50        log.info(f"Writing .bed file: {self.__filename}")
 51
 52        # Optionally rename potential missing values in `snpobj.calldata_gt` before writing
 53        if rename_missing_values:
 54            self.__snpobj.rename_missings(before=before, after=after, inplace=True)
 55
 56        # If the input matrix has three dimensions, it indicates that the data is divided into two strands.
 57        if len(self.__snpobj.calldata_gt.shape) == 3:
 58            # Sum the two strands
 59            self.__snpobj.calldata_gt = self.__snpobj.calldata_gt.transpose(1, 0, 2).sum(axis=2, dtype=np.int8)
 60
 61        # Infer the number of samples and variants from the matrix
 62        samples, variants = self.__snpobj.calldata_gt.shape
 63
 64        # Define the PgenWriter to save the data
 65        data_save = pg.PgenWriter(filename=str(self.__filename).encode('utf-8'),
 66                                  sample_ct=samples,
 67                                  variant_ct=variants,
 68                                  nonref_flags=True,
 69                                  hardcall_phase_present=False,
 70                                  dosage_present=True,
 71                                  dosage_phase_present=False)
 72
 73        # Fill the data_save object with the matrix of individuals x variants
 74        for snp_i in range(0, variants):
 75            data_save.append_biallelic(np.ascontiguousarray(self.__snpobj.calldata_gt[:, snp_i]))
 76
 77        # Save the .bed file
 78        data_save.close()
 79
 80        log.info(f"Finished writing .bed file: {self.__filename}")
 81
 82        # Remove .bed from the file name
 83        if self.__filename.suffix == '.bed':
 84            self.__filename = self.__filename.with_suffix('')
 85
 86        # Save .fam file
 87        log.info(f"Writing .fam file: {self.__filename}")
 88
 89        # Fill .fam file
 90        fam_file = pd.DataFrame(columns=['fid', 'iid', 'father', 'mother', 'gender', 'trait'])
 91        fam_file['iid'] = self.__snpobj.samples
 92        fam_file['fid'] = self.__snpobj.samples
 93
 94        # Save .fam file
 95        fam_file.to_csv(self.__filename.with_suffix('.fam'), sep='\t', index=False, header=False)
 96        log.info(f"Finished writing .fam file: {self.__filename}")
 97
 98        # Save .bim file
 99        log.info(f"Writing .bim file: {self.__filename}")
100
101        # Fill .bim file
102        bim_file = pd.DataFrame(columns=['chrom', 'snp', 'cm', 'pos', 'a0', 'a1'])
103        bim_file['chrom'] = self.__snpobj.variants_chrom
104        bim_file['snp'] = self.__snpobj.variants_id
105        bim_file['cm'] = 0  # TODO: read, save and write too if available?
106        log.warning("The .bim file is being saved with 0 cM values.")
107        bim_file['pos'] = self.__snpobj.variants_pos
108        bim_file['a0'] = self.__snpobj.variants_alt
109        bim_file['a1'] = self.__snpobj.variants_ref
110
111        # Save .bim file
112        bim_file.to_csv(self.__filename.with_suffix('.bim'), sep='\t', index=False, header=False)
113        log.info(f"Finished writing .bim file: {self.__filename}")

Writes an object in bed/bim/fam formats in the specified output path.

Arguments:
  • snpobj: The SNPObject to be written.
  • file: The output file path.
BEDWriter(snpobj: snputils.snp.genobj.SNPObject, filename: str)
23    def __init__(self, snpobj: SNPObject, filename: str):
24        self.__snpobj = snpobj.copy()
25        self.__filename = Path(filename)
def write( self, rename_missing_values: bool = True, before: Union[int, float, str] = -1, after: Union[int, float, str] = '.'):
 27    def write(
 28            self,
 29            rename_missing_values: bool = True, 
 30            before: Union[int, float, str] = -1, 
 31            after: Union[int, float, str] = '.'
 32        ):
 33        """
 34        Writes the SNPObject to bed/bim/fam formats.
 35
 36        Args:
 37            rename_missing_values (bool, optional):
 38                If True, renames potential missing values in `snpobj.calldata_gt` before writing. 
 39                Defaults to True.
 40            before (int, float, or str, default=-1): 
 41                The current representation of missing values in `calldata_gt`. Common values might be -1, '.', or NaN.
 42                Default is -1.
 43            after (int, float, or str, default='.'): 
 44                The value that will replace `before`. Default is '.'.
 45        """
 46        # Save .bed file
 47        if self.__filename.suffix != '.bed':
 48            self.__filename = self.__filename.with_suffix('.bed')
 49
 50        log.info(f"Writing .bed file: {self.__filename}")
 51
 52        # Optionally rename potential missing values in `snpobj.calldata_gt` before writing
 53        if rename_missing_values:
 54            self.__snpobj.rename_missings(before=before, after=after, inplace=True)
 55
 56        # If the input matrix has three dimensions, it indicates that the data is divided into two strands.
 57        if len(self.__snpobj.calldata_gt.shape) == 3:
 58            # Sum the two strands
 59            self.__snpobj.calldata_gt = self.__snpobj.calldata_gt.transpose(1, 0, 2).sum(axis=2, dtype=np.int8)
 60
 61        # Infer the number of samples and variants from the matrix
 62        samples, variants = self.__snpobj.calldata_gt.shape
 63
 64        # Define the PgenWriter to save the data
 65        data_save = pg.PgenWriter(filename=str(self.__filename).encode('utf-8'),
 66                                  sample_ct=samples,
 67                                  variant_ct=variants,
 68                                  nonref_flags=True,
 69                                  hardcall_phase_present=False,
 70                                  dosage_present=True,
 71                                  dosage_phase_present=False)
 72
 73        # Fill the data_save object with the matrix of individuals x variants
 74        for snp_i in range(0, variants):
 75            data_save.append_biallelic(np.ascontiguousarray(self.__snpobj.calldata_gt[:, snp_i]))
 76
 77        # Save the .bed file
 78        data_save.close()
 79
 80        log.info(f"Finished writing .bed file: {self.__filename}")
 81
 82        # Remove .bed from the file name
 83        if self.__filename.suffix == '.bed':
 84            self.__filename = self.__filename.with_suffix('')
 85
 86        # Save .fam file
 87        log.info(f"Writing .fam file: {self.__filename}")
 88
 89        # Fill .fam file
 90        fam_file = pd.DataFrame(columns=['fid', 'iid', 'father', 'mother', 'gender', 'trait'])
 91        fam_file['iid'] = self.__snpobj.samples
 92        fam_file['fid'] = self.__snpobj.samples
 93
 94        # Save .fam file
 95        fam_file.to_csv(self.__filename.with_suffix('.fam'), sep='\t', index=False, header=False)
 96        log.info(f"Finished writing .fam file: {self.__filename}")
 97
 98        # Save .bim file
 99        log.info(f"Writing .bim file: {self.__filename}")
100
101        # Fill .bim file
102        bim_file = pd.DataFrame(columns=['chrom', 'snp', 'cm', 'pos', 'a0', 'a1'])
103        bim_file['chrom'] = self.__snpobj.variants_chrom
104        bim_file['snp'] = self.__snpobj.variants_id
105        bim_file['cm'] = 0  # TODO: read, save and write too if available?
106        log.warning("The .bim file is being saved with 0 cM values.")
107        bim_file['pos'] = self.__snpobj.variants_pos
108        bim_file['a0'] = self.__snpobj.variants_alt
109        bim_file['a1'] = self.__snpobj.variants_ref
110
111        # Save .bim file
112        bim_file.to_csv(self.__filename.with_suffix('.bim'), sep='\t', index=False, header=False)
113        log.info(f"Finished writing .bim file: {self.__filename}")

Writes the SNPObject to bed/bim/fam formats.

Arguments:
  • rename_missing_values (bool, optional): If True, renames potential missing values in snpobj.calldata_gt before writing. Defaults to True.
  • before (int, float, or str, default=-1): The current representation of missing values in calldata_gt. Common values might be -1, '.', or NaN. Default is -1.
  • after (int, float, or str, default='.'): The value that will replace before. Default is '.'.
class PGENWriter:
 15class PGENWriter:
 16    """
 17    Writes a genotype object in PGEN format (.pgen, .psam, and .pvar files) in the specified output path.
 18    """
 19
 20    def __init__(self, snpobj: SNPObject, filename: str):
 21        """
 22        Initializes the PGENWriter instance.
 23
 24        Args:
 25            snpobj (SNPObject): The SNPObject containing genotype data to be written.
 26            filename (str): Base path for the output files (excluding extension).
 27        """
 28        self.__snpobj = snpobj
 29        self.__filename = Path(filename)
 30
 31    def write(
 32            self, 
 33            vzs: bool = False,
 34            rename_missing_values: bool = True, 
 35            before: Union[int, float, str] = -1, 
 36            after: Union[int, float, str] = '.'
 37        ):
 38        """
 39        Writes the SNPObject data to .pgen, .psam, and .pvar files.
 40
 41        Args:
 42            vzs (bool, optional): 
 43                If True, compresses the .pvar file using zstd and saves it as .pvar.zst. Defaults to False.
 44            rename_missing_values (bool, optional):
 45                If True, renames potential missing values in `snpobj.calldata_gt` before writing. 
 46                Defaults to True.
 47            before (int, float, or str, default=-1): 
 48                The current representation of missing values in `calldata_gt`. Common values might be -1, '.', or NaN.
 49                Default is -1.
 50            after (int, float, or str, default='.'): 
 51                The value that will replace `before`. Default is '.'.
 52        """
 53        file_extensions = (".pgen", ".psam", ".pvar", ".pvar.zst")
 54        if self.__filename.suffix in file_extensions:
 55            self.__filename = self.__filename.with_suffix('')
 56
 57        # Optionally rename potential missing values in `snpobj.calldata_gt` before writing
 58        if rename_missing_values:
 59            self.__snpobj.rename_missings(before=before, after=after, inplace=True)
 60
 61        self.write_pvar(vzs=vzs)
 62        self.write_psam()
 63        self.write_pgen()
 64
 65    def write_pvar(self, vzs: bool = False):
 66        """
 67        Writes variant data to the .pvar file.
 68
 69        Args:
 70            vzs (bool, optional): If True, compresses the .pvar file using zstd and saves it as .pvar.zst. Defaults to False.
 71        """
 72        output_filename = f"{self.__filename}.pvar"
 73        if vzs:
 74            output_filename += ".zst"
 75            log.info(f"Writing to {output_filename} (compressed)")
 76        else:
 77            log.info(f"Writing to {output_filename}")
 78
 79        df = pl.DataFrame(
 80            {
 81                "#CHROM": self.__snpobj.variants_chrom,
 82                "POS": self.__snpobj.variants_pos,
 83                "ID": self.__snpobj.variants_id,
 84                "REF": self.__snpobj.variants_ref,
 85                "ALT": self.__snpobj.variants_alt,
 86                "FILTER": self.__snpobj.variants_filter_pass,
 87                # TODO: add INFO column to SNPObject and write it to the .pvar file? (if not it's lost)
 88            }
 89        )
 90        # TODO: add header to the .pvar file, if not it's lost
 91
 92        # Write the DataFrame to a CSV string
 93        csv_data = df.write_csv(None, separator="\t")
 94
 95        if vzs:
 96            # Compress the CSV data using zstd
 97            cctx = zstd.ZstdCompressor()
 98            compressed_data = cctx.compress(csv_data.encode('utf-8'))
 99            with open(output_filename, 'wb') as f:
100                f.write(compressed_data)
101        else:
102            with open(output_filename, 'w') as f:
103                f.write(csv_data)
104
105    def write_psam(self):
106        """
107        Writes sample metadata to the .psam file.
108        """
109        log.info(f"Writing {self.__filename}.psam")
110        df = pl.DataFrame(
111            {
112                "#IID": self.__snpobj.samples,
113                "SEX": "NA",  # Add SEX as nan for now
114                # TODO: add SEX as Optional column to SNPObject and write it to the .psam file (if not it's lost)
115            }
116        )
117        df.write_csv(f"{self.__filename}.psam", separator="\t")
118
119    def write_pgen(self):
120        """
121        Writes the genotype data to a .pgen file.
122        """
123        log.info(f"Writing to {self.__filename}.pgen")
124        summed_strands = False if self.__snpobj.calldata_gt.ndim == 3 else True
125        if not summed_strands:
126            num_variants, num_samples, num_alleles = self.__snpobj.calldata_gt.shape
127            # Flatten the genotype matrix for pgenlib
128            flat_genotypes = self.__snpobj.calldata_gt.reshape(
129                num_variants, num_samples * num_alleles
130            )
131            with pg.PgenWriter(
132                filename=f"{self.__filename}.pgen".encode('utf-8'),
133                sample_ct=num_samples,
134                variant_ct=num_variants,
135                hardcall_phase_present=True,
136            ) as writer:
137                for variant_index in range(num_variants):
138                    writer.append_alleles(
139                        flat_genotypes[variant_index].astype(np.int32), all_phased=True
140                    )
141        else:
142            num_variants, num_samples = self.__snpobj.calldata_gt.shape
143            # Transpose to (samples, variants)
144            genotypes = self.__snpobj.calldata_gt.T  # Shape is (samples, variants)
145            with pg.PgenWriter(
146                filename=f"{self.__filename}.pgen".encode('utf-8'),
147                sample_ct=num_samples,
148                variant_ct=num_variants,
149                hardcall_phase_present=False,
150            ) as writer:
151                for variant_index in range(num_variants):
152                    variant_genotypes = genotypes[:, variant_index].astype(np.int8)
153                    # Map missing genotypes to -9 if necessary
154                    variant_genotypes[variant_genotypes == -1] = -9
155                    writer.append_biallelic(np.ascontiguousarray(variant_genotypes))

Writes a genotype object in PGEN format (.pgen, .psam, and .pvar files) in the specified output path.

PGENWriter(snpobj: snputils.snp.genobj.SNPObject, filename: str)
20    def __init__(self, snpobj: SNPObject, filename: str):
21        """
22        Initializes the PGENWriter instance.
23
24        Args:
25            snpobj (SNPObject): The SNPObject containing genotype data to be written.
26            filename (str): Base path for the output files (excluding extension).
27        """
28        self.__snpobj = snpobj
29        self.__filename = Path(filename)

Initializes the PGENWriter instance.

Arguments:
  • snpobj (SNPObject): The SNPObject containing genotype data to be written.
  • filename (str): Base path for the output files (excluding extension).
def write( self, vzs: bool = False, rename_missing_values: bool = True, before: Union[int, float, str] = -1, after: Union[int, float, str] = '.'):
31    def write(
32            self, 
33            vzs: bool = False,
34            rename_missing_values: bool = True, 
35            before: Union[int, float, str] = -1, 
36            after: Union[int, float, str] = '.'
37        ):
38        """
39        Writes the SNPObject data to .pgen, .psam, and .pvar files.
40
41        Args:
42            vzs (bool, optional): 
43                If True, compresses the .pvar file using zstd and saves it as .pvar.zst. Defaults to False.
44            rename_missing_values (bool, optional):
45                If True, renames potential missing values in `snpobj.calldata_gt` before writing. 
46                Defaults to True.
47            before (int, float, or str, default=-1): 
48                The current representation of missing values in `calldata_gt`. Common values might be -1, '.', or NaN.
49                Default is -1.
50            after (int, float, or str, default='.'): 
51                The value that will replace `before`. Default is '.'.
52        """
53        file_extensions = (".pgen", ".psam", ".pvar", ".pvar.zst")
54        if self.__filename.suffix in file_extensions:
55            self.__filename = self.__filename.with_suffix('')
56
57        # Optionally rename potential missing values in `snpobj.calldata_gt` before writing
58        if rename_missing_values:
59            self.__snpobj.rename_missings(before=before, after=after, inplace=True)
60
61        self.write_pvar(vzs=vzs)
62        self.write_psam()
63        self.write_pgen()

Writes the SNPObject data to .pgen, .psam, and .pvar files.

Arguments:
  • vzs (bool, optional): If True, compresses the .pvar file using zstd and saves it as .pvar.zst. Defaults to False.
  • rename_missing_values (bool, optional): If True, renames potential missing values in snpobj.calldata_gt before writing. Defaults to True.
  • before (int, float, or str, default=-1): The current representation of missing values in calldata_gt. Common values might be -1, '.', or NaN. Default is -1.
  • after (int, float, or str, default='.'): The value that will replace before. Default is '.'.
def write_pvar(self, vzs: bool = False):
 65    def write_pvar(self, vzs: bool = False):
 66        """
 67        Writes variant data to the .pvar file.
 68
 69        Args:
 70            vzs (bool, optional): If True, compresses the .pvar file using zstd and saves it as .pvar.zst. Defaults to False.
 71        """
 72        output_filename = f"{self.__filename}.pvar"
 73        if vzs:
 74            output_filename += ".zst"
 75            log.info(f"Writing to {output_filename} (compressed)")
 76        else:
 77            log.info(f"Writing to {output_filename}")
 78
 79        df = pl.DataFrame(
 80            {
 81                "#CHROM": self.__snpobj.variants_chrom,
 82                "POS": self.__snpobj.variants_pos,
 83                "ID": self.__snpobj.variants_id,
 84                "REF": self.__snpobj.variants_ref,
 85                "ALT": self.__snpobj.variants_alt,
 86                "FILTER": self.__snpobj.variants_filter_pass,
 87                # TODO: add INFO column to SNPObject and write it to the .pvar file? (if not it's lost)
 88            }
 89        )
 90        # TODO: add header to the .pvar file, if not it's lost
 91
 92        # Write the DataFrame to a CSV string
 93        csv_data = df.write_csv(None, separator="\t")
 94
 95        if vzs:
 96            # Compress the CSV data using zstd
 97            cctx = zstd.ZstdCompressor()
 98            compressed_data = cctx.compress(csv_data.encode('utf-8'))
 99            with open(output_filename, 'wb') as f:
100                f.write(compressed_data)
101        else:
102            with open(output_filename, 'w') as f:
103                f.write(csv_data)

Writes variant data to the .pvar file.

Arguments:
  • vzs (bool, optional): If True, compresses the .pvar file using zstd and saves it as .pvar.zst. Defaults to False.
def write_psam(self):
105    def write_psam(self):
106        """
107        Writes sample metadata to the .psam file.
108        """
109        log.info(f"Writing {self.__filename}.psam")
110        df = pl.DataFrame(
111            {
112                "#IID": self.__snpobj.samples,
113                "SEX": "NA",  # Add SEX as nan for now
114                # TODO: add SEX as Optional column to SNPObject and write it to the .psam file (if not it's lost)
115            }
116        )
117        df.write_csv(f"{self.__filename}.psam", separator="\t")

Writes sample metadata to the .psam file.

def write_pgen(self):
119    def write_pgen(self):
120        """
121        Writes the genotype data to a .pgen file.
122        """
123        log.info(f"Writing to {self.__filename}.pgen")
124        summed_strands = False if self.__snpobj.calldata_gt.ndim == 3 else True
125        if not summed_strands:
126            num_variants, num_samples, num_alleles = self.__snpobj.calldata_gt.shape
127            # Flatten the genotype matrix for pgenlib
128            flat_genotypes = self.__snpobj.calldata_gt.reshape(
129                num_variants, num_samples * num_alleles
130            )
131            with pg.PgenWriter(
132                filename=f"{self.__filename}.pgen".encode('utf-8'),
133                sample_ct=num_samples,
134                variant_ct=num_variants,
135                hardcall_phase_present=True,
136            ) as writer:
137                for variant_index in range(num_variants):
138                    writer.append_alleles(
139                        flat_genotypes[variant_index].astype(np.int32), all_phased=True
140                    )
141        else:
142            num_variants, num_samples = self.__snpobj.calldata_gt.shape
143            # Transpose to (samples, variants)
144            genotypes = self.__snpobj.calldata_gt.T  # Shape is (samples, variants)
145            with pg.PgenWriter(
146                filename=f"{self.__filename}.pgen".encode('utf-8'),
147                sample_ct=num_samples,
148                variant_ct=num_variants,
149                hardcall_phase_present=False,
150            ) as writer:
151                for variant_index in range(num_variants):
152                    variant_genotypes = genotypes[:, variant_index].astype(np.int8)
153                    # Map missing genotypes to -9 if necessary
154                    variant_genotypes[variant_genotypes == -1] = -9
155                    writer.append_biallelic(np.ascontiguousarray(variant_genotypes))

Writes the genotype data to a .pgen file.

class VCFWriter:
 12class VCFWriter:
 13    """
 14    A writer class for exporting SNP data from a `snputils.snp.genobj.SNPObject` 
 15    into an `.vcf` file.
 16    """
 17    def __init__(self, snpobj: SNPObject, filename: str, n_jobs: int = -1, phased: bool = False):
 18        """
 19        Args:
 20            snpobj (SNPObject):
 21                A SNPObject instance.
 22            file (str or pathlib.Path): 
 23                Path to the file where the data will be saved. It should end with `.vcf`. 
 24                If the provided path does not have this extension, the `.vcf` extension will be appended.
 25            n_jobs: 
 26                Number of jobs to run in parallel. 
 27                - `None`: use 1 job unless within a `joblib.parallel_backend` context.  
 28                - `-1`: use all available processors.  
 29                - Any other integer: use the specified number of jobs.
 30            phased: 
 31                If True, genotype data is written in "maternal|paternal" format.  
 32                If False, genotype data is written in "maternal/paternal" format.
 33        """
 34        self.__snpobj = snpobj
 35        self.__filename = Path(filename)
 36        self.__n_jobs = n_jobs
 37        self.__phased = phased
 38
 39    def write(
 40            self, 
 41            chrom_partition: bool = False,
 42            rename_missing_values: bool = True, 
 43            before: Union[int, float, str] = -1, 
 44            after: Union[int, float, str] = '.'
 45        ):
 46        """
 47        Writes the SNP data to VCF file(s).
 48
 49        Args:
 50            chrom_partition (bool, optional):
 51                If True, individual VCF files are generated for each chromosome.
 52                If False, a single VCF file containing data for all chromosomes is created. Defaults to False.
 53            rename_missing_values (bool, optional):
 54                If True, renames potential missing values in `snpobj.calldata_gt` before writing. 
 55                Defaults to True.
 56            before (int, float, or str, default=-1): 
 57                The current representation of missing values in `calldata_gt`. Common values might be -1, '.', or NaN.
 58                Default is -1.
 59            after (int, float, or str, default='.'): 
 60                The value that will replace `before`. Default is '.'.
 61        """
 62        self.__chrom_partition = chrom_partition
 63
 64        file_extensions = (".vcf", ".bcf")
 65        if self.__filename.suffix in file_extensions:
 66            self.__file_extension = self.__filename.suffix
 67            self.__filename = self.__filename.with_suffix('')
 68        else:
 69            self.__file_extension = ".vcf"
 70
 71        # Optionally rename potential missing values in `snpobj.calldata_gt` before writing
 72        if rename_missing_values:
 73            self.__snpobj.rename_missings(before=before, after=after, inplace=True)
 74
 75        data = self.__snpobj
 76
 77        if self.__chrom_partition:
 78            chroms = data.unique_chrom
 79
 80            for chrom in chroms:
 81                # Filter to include the data for the chromosome in particular
 82                data_chrom = data.filter_variants(chrom=chrom, inplace=False)
 83
 84                log.debug(f'Storing chromosome {chrom}')
 85                self._write_chromosome_data(chrom, data_chrom)
 86        else:
 87            self._write_chromosome_data("All", data)
 88
 89    def _write_chromosome_data(self, chrom, data_chrom):
 90        """
 91        Writes the SNP data for a specific chromosome to a VCF file.
 92
 93        Args:
 94            chrom: The chromosome name.
 95            data_chrom: The SNPObject instance containing the data for the chromosome.
 96        """
 97        # Obtain npy matrix with SNPs
 98        npy3 = data_chrom.calldata_gt  # shape: (n_windows, n_samples, 2)
 99        n_windows, n_samples, _ = npy3.shape
100
101        # Keep sample names if appropriate
102        data_samples = data_chrom.samples if len(data_chrom.samples) == n_samples else [get_name() for _ in range(n_samples)]
103
104        # Format output file
105        if chrom == "All":
106            file = self.__filename.with_suffix(self.__file_extension)
107        else:
108            file = self.__filename.parent / f"{self.__filename.stem}_{chrom}{self.__file_extension}"
109
110        # Write VCF file
111        out = open(self.__filename.with_suffix(self.__file_extension), "w")
112        # --- write VCF header ---
113        out.write("##fileformat=VCFv4.1\n")
114        out.write('##FORMAT=<ID=GT,Number=1,Type=String,Description="Phased Genotype">\n')
115        
116        for c in set(data_chrom.variants_chrom):
117            out.write(f"##contig=<ID={c}>\n")
118        cols = ["#CHROM","POS","ID","REF","ALT","QUAL","FILTER","INFO","FORMAT"] + list(data_chrom.samples)
119        out.write("\t".join(cols) + "\n")
120        
121        sep = "|" if self.__phased else "/"
122        for i in range(n_windows):
123            chrom = data_chrom.variants_chrom[i]
124            pos = data_chrom.variants_pos[i]
125            vid = data_chrom.variants_id[i]
126            ref = data_chrom.variants_ref[i]
127            alt = data_chrom.variants_alt[i]
128        
129            # build genotype list per sample, one small array at a time
130            row = npy3[i]  # shape: (n_samples, 2)
131            genotypes = [
132                f"{row[s,0]}{sep}{row[s,1]}"
133                for s in range(n_samples)
134            ]
135        
136            line = "\t".join([
137                str(chrom), str(pos), vid, ref, alt,
138                ".", "PASS", ".", "GT", *genotypes
139            ])
140            out.write(line + "\n")
141        
142        out.close()

A writer class for exporting SNP data from a snputils.snp.genobj.SNPObject into an .vcf file.

VCFWriter( snpobj: snputils.snp.genobj.SNPObject, filename: str, n_jobs: int = -1, phased: bool = False)
17    def __init__(self, snpobj: SNPObject, filename: str, n_jobs: int = -1, phased: bool = False):
18        """
19        Args:
20            snpobj (SNPObject):
21                A SNPObject instance.
22            file (str or pathlib.Path): 
23                Path to the file where the data will be saved. It should end with `.vcf`. 
24                If the provided path does not have this extension, the `.vcf` extension will be appended.
25            n_jobs: 
26                Number of jobs to run in parallel. 
27                - `None`: use 1 job unless within a `joblib.parallel_backend` context.  
28                - `-1`: use all available processors.  
29                - Any other integer: use the specified number of jobs.
30            phased: 
31                If True, genotype data is written in "maternal|paternal" format.  
32                If False, genotype data is written in "maternal/paternal" format.
33        """
34        self.__snpobj = snpobj
35        self.__filename = Path(filename)
36        self.__n_jobs = n_jobs
37        self.__phased = phased
Arguments:
  • snpobj (SNPObject): A SNPObject instance.
  • file (str or pathlib.Path): Path to the file where the data will be saved. It should end with .vcf. If the provided path does not have this extension, the .vcf extension will be appended.
  • n_jobs: Number of jobs to run in parallel.
    • None: use 1 job unless within a joblib.parallel_backend context.
    • -1: use all available processors.
    • Any other integer: use the specified number of jobs.
  • phased: If True, genotype data is written in "maternal|paternal" format.
    If False, genotype data is written in "maternal/paternal" format.
def write( self, chrom_partition: bool = False, rename_missing_values: bool = True, before: Union[int, float, str] = -1, after: Union[int, float, str] = '.'):
39    def write(
40            self, 
41            chrom_partition: bool = False,
42            rename_missing_values: bool = True, 
43            before: Union[int, float, str] = -1, 
44            after: Union[int, float, str] = '.'
45        ):
46        """
47        Writes the SNP data to VCF file(s).
48
49        Args:
50            chrom_partition (bool, optional):
51                If True, individual VCF files are generated for each chromosome.
52                If False, a single VCF file containing data for all chromosomes is created. Defaults to False.
53            rename_missing_values (bool, optional):
54                If True, renames potential missing values in `snpobj.calldata_gt` before writing. 
55                Defaults to True.
56            before (int, float, or str, default=-1): 
57                The current representation of missing values in `calldata_gt`. Common values might be -1, '.', or NaN.
58                Default is -1.
59            after (int, float, or str, default='.'): 
60                The value that will replace `before`. Default is '.'.
61        """
62        self.__chrom_partition = chrom_partition
63
64        file_extensions = (".vcf", ".bcf")
65        if self.__filename.suffix in file_extensions:
66            self.__file_extension = self.__filename.suffix
67            self.__filename = self.__filename.with_suffix('')
68        else:
69            self.__file_extension = ".vcf"
70
71        # Optionally rename potential missing values in `snpobj.calldata_gt` before writing
72        if rename_missing_values:
73            self.__snpobj.rename_missings(before=before, after=after, inplace=True)
74
75        data = self.__snpobj
76
77        if self.__chrom_partition:
78            chroms = data.unique_chrom
79
80            for chrom in chroms:
81                # Filter to include the data for the chromosome in particular
82                data_chrom = data.filter_variants(chrom=chrom, inplace=False)
83
84                log.debug(f'Storing chromosome {chrom}')
85                self._write_chromosome_data(chrom, data_chrom)
86        else:
87            self._write_chromosome_data("All", data)

Writes the SNP data to VCF file(s).

Arguments:
  • chrom_partition (bool, optional): If True, individual VCF files are generated for each chromosome. If False, a single VCF file containing data for all chromosomes is created. Defaults to False.
  • rename_missing_values (bool, optional): If True, renames potential missing values in snpobj.calldata_gt before writing. Defaults to True.
  • before (int, float, or str, default=-1): The current representation of missing values in calldata_gt. Common values might be -1, '.', or NaN. Default is -1.
  • after (int, float, or str, default='.'): The value that will replace before. Default is '.'.