snputils.snp.io

1from .read import SNPReader, BEDReader, PGENReader, VCFReader, read_snp, read_bed, read_pgen, read_vcf
2from .write import BEDWriter, PGENWriter, VCFWriter
3
4__all__ = ['read_snp', 'read_bed', 'read_pgen', 'read_vcf',
5           'SNPReader', 'BEDReader', 'PGENReader', 'VCFReader',
6           'BEDWriter', 'PGENWriter', 'VCFWriter']
def read_snp( filename: Union[str, pathlib._local.Path], **kwargs) -> snputils.snp.genobj.SNPObject:
 8def read_snp(filename: Union[str, pathlib.Path], **kwargs) -> SNPObject:
 9    """
10    Automatically detect the file format and read it into a SNPObject.
11
12    Args:
13        filename: Filename of the file to read.
14        **kwargs: Additional arguments passed to the reader method.
15
16    Raises:
17        ValueError: If the filename does not have an extension or the extension is not supported.
18    """
19    from snputils.snp.io.read.auto import SNPReader
20
21    return SNPReader(filename).read(**kwargs)

Automatically detect the file format and read it into a SNPObject.

Arguments:
  • filename: Filename of the file to read.
  • **kwargs: Additional arguments passed to the reader method.
Raises:
  • ValueError: If the filename does not have an extension or the extension is not supported.
def read_bed( filename: Union[str, pathlib._local.Path], **kwargs) -> snputils.snp.genobj.SNPObject:
24def read_bed(filename: Union[str, pathlib.Path], **kwargs) -> SNPObject:
25    """
26    Read a BED fileset into a SNPObject.
27
28    Args:
29        filename: Filename of the BED fileset to read.
30        **kwargs: Additional arguments passed to the reader method. See :class:`snputils.snp.io.read.bed.BEDReader` for possible parameters.
31    """
32    from snputils.snp.io.read.bed import BEDReader
33
34    return BEDReader(filename).read(**kwargs)

Read a BED fileset into a SNPObject.

Arguments:
  • filename: Filename of the BED fileset to read.
  • **kwargs: Additional arguments passed to the reader method. See snputils.snp.io.read.bed.BEDReader for possible parameters.
def read_pgen( filename: Union[str, pathlib._local.Path], **kwargs) -> snputils.snp.genobj.SNPObject:
37def read_pgen(filename: Union[str, pathlib.Path], **kwargs) -> SNPObject:
38    """
39    Read a PGEN fileset into a SNPObject.
40
41    Args:
42        filename: Filename of the PGEN fileset to read.
43        **kwargs: Additional arguments passed to the reader method. See :class:`snputils.snp.io.read.pgen.PGENReader` for possible parameters.
44    """
45    from snputils.snp.io.read.pgen import PGENReader
46
47    return PGENReader(filename).read(**kwargs)

Read a PGEN fileset into a SNPObject.

Arguments:
def read_vcf( filename: Union[str, pathlib._local.Path], backend: str = 'polars', **kwargs) -> snputils.snp.genobj.SNPObject:
50def read_vcf(filename: Union[str, pathlib.Path], 
51             backend: str = 'polars',
52             **kwargs) -> SNPObject:
53    """
54    Read a VCF fileset into a SNPObject.
55
56    Args:
57        filename: Filename of the VCF fileset to read.
58        backend: Backend to use for reading the VCF file. Options are 'polars' or 'scikit-allel'.
59        **kwargs: Additional arguments passed to the reader method. See :class:`snputils.snp.io.read.vcf.VCFReader` for possible parameters.
60    """
61    from snputils.snp.io.read.vcf import VCFReader, VCFReaderPolars
62    if backend == 'polars':
63        print(f"Reading {filename} with polars backend")
64        return VCFReaderPolars(filename).read(**kwargs)
65    else:
66        print(f"Reading {filename} with scikit-allel backend")
67        return VCFReader(filename).read(**kwargs)

Read a VCF fileset into a SNPObject.

Arguments:
  • filename: Filename of the VCF fileset to read.
  • backend: Backend to use for reading the VCF file. Options are 'polars' or 'scikit-allel'.
  • **kwargs: Additional arguments passed to the reader method. See snputils.snp.io.read.vcf.VCFReader for possible parameters.
class SNPReader:
 8class SNPReader:
 9    def __new__(cls,
10                filename: Union[str, pathlib.Path],
11                vcf_backend: str = 'polars') -> SNPReader:
12        """
13        Automatically detect the SNP file format from the file extension, and return its corresponding reader.
14
15        Args:
16            filename: Filename of the file to read.
17            vcf_backend: Backend to use for reading the VCF file. Options are 'polars' or 'scikit-allel'. Default is 'polars'.
18
19        Raises:
20            ValueError: If the filename does not have an extension or the extension is not supported.
21        """
22        filename = pathlib.Path(filename)
23        suffixes = filename.suffixes
24        if not suffixes:
25            raise ValueError("The filename should have an extension when using SNPReader.")
26
27        extension = suffixes[-2] if suffixes[-1].lower() in (".zst", ".gz") else suffixes[-1]
28        extension = extension.lower()
29
30        if extension == ".vcf":
31            if vcf_backend == 'polars':
32                from snputils.snp.io.read.vcf import VCFReaderPolars
33
34                return VCFReaderPolars(filename)
35            elif vcf_backend == 'scikit-allel':
36                from snputils.snp.io.read.vcf import VCFReader
37
38                return VCFReader(filename)
39            else:
40                raise ValueError(f"VCF backend not supported: {vcf_backend}")
41        elif extension in (".bed", ".bim", ".fam"):
42            from snputils.snp.io.read.bed import BEDReader
43
44            return BEDReader(filename)
45        elif extension in (".pgen", ".pvar", ".psam", ".pvar.zst"):
46            from snputils.snp.io.read.pgen import PGENReader
47
48            return PGENReader(filename)
49        else:
50            raise ValueError(f"File format not supported: {filename}")
SNPReader( filename: Union[str, pathlib._local.Path], vcf_backend: str = 'polars')
 9    def __new__(cls,
10                filename: Union[str, pathlib.Path],
11                vcf_backend: str = 'polars') -> SNPReader:
12        """
13        Automatically detect the SNP file format from the file extension, and return its corresponding reader.
14
15        Args:
16            filename: Filename of the file to read.
17            vcf_backend: Backend to use for reading the VCF file. Options are 'polars' or 'scikit-allel'. Default is 'polars'.
18
19        Raises:
20            ValueError: If the filename does not have an extension or the extension is not supported.
21        """
22        filename = pathlib.Path(filename)
23        suffixes = filename.suffixes
24        if not suffixes:
25            raise ValueError("The filename should have an extension when using SNPReader.")
26
27        extension = suffixes[-2] if suffixes[-1].lower() in (".zst", ".gz") else suffixes[-1]
28        extension = extension.lower()
29
30        if extension == ".vcf":
31            if vcf_backend == 'polars':
32                from snputils.snp.io.read.vcf import VCFReaderPolars
33
34                return VCFReaderPolars(filename)
35            elif vcf_backend == 'scikit-allel':
36                from snputils.snp.io.read.vcf import VCFReader
37
38                return VCFReader(filename)
39            else:
40                raise ValueError(f"VCF backend not supported: {vcf_backend}")
41        elif extension in (".bed", ".bim", ".fam"):
42            from snputils.snp.io.read.bed import BEDReader
43
44            return BEDReader(filename)
45        elif extension in (".pgen", ".pvar", ".psam", ".pvar.zst"):
46            from snputils.snp.io.read.pgen import PGENReader
47
48            return PGENReader(filename)
49        else:
50            raise ValueError(f"File format not supported: {filename}")

Automatically detect the SNP file format from the file extension, and return its corresponding reader.

Arguments:
  • filename: Filename of the file to read.
  • vcf_backend: Backend to use for reading the VCF file. Options are 'polars' or 'scikit-allel'. Default is 'polars'.
Raises:
  • ValueError: If the filename does not have an extension or the extension is not supported.
@SNPBaseReader.register
class BEDReader(snputils.snp.io.read.base.SNPBaseReader):
 16@SNPBaseReader.register
 17class BEDReader(SNPBaseReader):
 18    def read(
 19        self,
 20        fields: Optional[List[str]] = None,
 21        exclude_fields: Optional[List[str]] = None,
 22        sample_ids: Optional[np.ndarray] = None,
 23        sample_idxs: Optional[np.ndarray] = None,
 24        variant_ids: Optional[np.ndarray] = None,
 25        variant_idxs: Optional[np.ndarray] = None,
 26        sum_strands: bool = False,
 27        separator: Optional[str] = None,
 28    ) -> SNPObject:
 29        """
 30        Read a bed fileset (bed, bim, fam) into a SNPObject.
 31
 32        Args:
 33            fields (str, None, or list of str, optional): Fields to extract data for that should be included in the returned SNPObject.
 34                Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS'.
 35                To extract all fields, set fields to None. Defaults to None.
 36            exclude_fields (str, None, or list of str, optional): Fields to exclude from the returned SNPObject.
 37                Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS'.
 38                To exclude no fields, set exclude_fields to None. Defaults to None.
 39            sample_ids: List of sample IDs to read. If None and sample_idxs is None, all samples are read.
 40            sample_idxs: List of sample indices to read. If None and sample_ids is None, all samples are read.
 41            variant_ids: List of variant IDs to read. If None and variant_idxs is None, all variants are read.
 42            variant_idxs: List of variant indices to read. If None and variant_ids is None, all variants are read.
 43            sum_strands: True if the maternal and paternal strands are to be summed together,
 44                False if the strands are to be stored separately. Note that due to the pgenlib backend, when sum_strands is False,
 45                8 times as much RAM is required. Nonetheless, the calldata_gt will only be double the size.
 46                WARNING: bed files do not store phase information. If you need it, use vcf or pgen.
 47            separator: Separator used in the pvar file. If None, the separator is automatically detected.
 48                If the automatic detection fails, please specify the separator manually.
 49
 50        Returns:
 51            snpobj: SNPObject containing the data from the pgen fileset.
 52                If sum_strands is False, calldata_gt is stored as a numpy array of shape
 53                (num_variants, num_samples, 2) and dtype int8 containing 0, 1.
 54                If sum_strands is True, calldata_gt is stored as a numpy array of shape
 55                (num_variants, num_samples) and dtype int8 containing 0, 1, 2.
 56
 57        Raises:
 58            AssertionError: If both sample_idxs and sample_ids are specified.
 59            AssertionError: If both variant_idxs and variant_ids are specified.
 60        """
 61        assert (
 62            sample_idxs is None or sample_ids is None
 63        ), "Only one of sample_idxs and sample_ids can be specified"
 64        assert (
 65            variant_idxs is None or variant_ids is None
 66        ), "Only one of variant_idxs and variant_ids can be specified"
 67
 68        if isinstance(fields, str):
 69            fields = [fields]
 70        if isinstance(exclude_fields, str):
 71            exclude_fields = [exclude_fields]
 72
 73        fields = fields or ["GT", "IID", "REF", "ALT", "#CHROM", "ID", "POS"]
 74        exclude_fields = exclude_fields or []
 75        fields = [field for field in fields if field not in exclude_fields]
 76        only_read_bed = fields == ["GT"] and variant_idxs is None and sample_idxs is None
 77
 78        filename_noext = str(self.filename)
 79        if filename_noext[-4:].lower() in (".bed", ".bim", ".fam"):
 80            filename_noext = filename_noext[:-4]
 81
 82        if only_read_bed:
 83            with open(filename_noext + '.fam', 'r') as f:
 84                file_num_samples = sum(1 for _ in f)  # Get sample count from fam file
 85            file_num_variants = None  # Not needed
 86        else:
 87            log.info(f"Reading {filename_noext}.bim")
 88
 89            if separator is None:
 90                with open(filename_noext + ".bim", "r") as file:
 91                    separator = csv.Sniffer().sniff(file.readline()).delimiter
 92
 93            bim = pl.read_csv(
 94                filename_noext + ".bim",
 95                separator=separator,
 96                has_header=False,
 97                new_columns=["#CHROM", "ID", "CM", "POS", "ALT", "REF"],
 98                schema_overrides={
 99                    "#CHROM": pl.String,
100                    "ID": pl.String,
101                    "CM": pl.Float64,
102                    "POS": pl.Int64,
103                    "ALT": pl.String,
104                    "REF": pl.String
105                },
106                null_values=["NA"]
107            ).with_row_index()
108            file_num_variants = bim.height
109
110            if variant_ids is not None:
111                variant_idxs = bim.filter(pl.col("ID").is_in(variant_ids)).select("index").to_series().to_numpy()
112
113            if variant_idxs is None:
114                num_variants = file_num_variants
115                variant_idxs = np.arange(num_variants, dtype=np.uint32)
116            else:
117                num_variants = np.size(variant_idxs)
118                variant_idxs = np.array(variant_idxs, dtype=np.uint32)
119                bim = bim.filter(pl.col("index").is_in(variant_idxs))
120
121            log.info(f"Reading {filename_noext}.fam")
122
123            fam = pl.read_csv(
124                filename_noext + ".fam",
125                separator=separator,
126                has_header=False,
127                new_columns=["Family ID", "IID", "Father ID",
128                             "Mother ID", "Sex code", "Phenotype value"],
129                schema_overrides={
130                    "Family ID": pl.String,
131                    "IID": pl.String,
132                    "Father ID": pl.String,
133                    "Mother ID": pl.String,
134                    "Sex code": pl.String,
135                },
136                null_values=["NA"]
137            ).with_row_index()
138            file_num_samples = fam.height
139
140            if sample_ids is not None:
141                sample_idxs = fam.filter(pl.col("IID").is_in(sample_ids)).select("index").to_series().to_numpy()
142
143            if sample_idxs is None:
144                num_samples = file_num_samples
145            else:
146                num_samples = np.size(sample_idxs)
147                sample_idxs = np.array(sample_idxs, dtype=np.uint32)
148                fam = fam.filter(pl.col("index").is_in(sample_idxs))
149
150        if "GT" in fields:
151            log.info(f"Reading {filename_noext}.bed")
152            pgen_reader = pg.PgenReader(
153                str.encode(filename_noext + ".bed"),
154                raw_sample_ct=file_num_samples,
155                variant_ct=file_num_variants,
156                sample_subset=sample_idxs,
157            )
158
159            if only_read_bed:
160                num_samples = pgen_reader.get_raw_sample_ct()
161                num_variants = pgen_reader.get_variant_ct()
162                variant_idxs = np.arange(num_variants, dtype=np.uint32)
163
164            # required arrays: variant_idxs + sample_idxs + genotypes
165            if not sum_strands:
166                required_ram = (num_samples + num_variants + num_variants * 2 * num_samples) * 4
167            else:
168                required_ram = (num_samples + num_variants) * 4 + num_variants * num_samples
169            log.info(f">{required_ram / 1024**3:.2f} GiB of RAM are required to process {num_samples} samples with {num_variants} variants each")
170
171            if not sum_strands:
172                genotypes = np.empty((num_variants, 2 * num_samples), dtype=np.int32)  # cannot use int8 because of pgenlib
173                pgen_reader.read_alleles_list(variant_idxs, genotypes)
174                genotypes = genotypes.astype(np.int8).reshape((num_variants, num_samples, 2))
175            else:
176                genotypes = np.empty((num_variants, num_samples), dtype=np.int8)
177                pgen_reader.read_list(variant_idxs, genotypes)
178            pgen_reader.close()
179        else:
180            genotypes = None
181
182        log.info("Constructing SNPObject")
183
184        snpobj = SNPObject(
185            calldata_gt=genotypes if "GT" in fields else None,
186            samples=fam.get_column("IID").to_numpy() if "IID" in fields and "IID" in fam.columns else None,
187            **{f'variants_{k.lower()}': bim.get_column(v).to_numpy() if v in fields and v in bim.columns else None
188               for k, v in {'ref': 'REF', 'alt': 'ALT', 'chrom': '#CHROM', 'id': 'ID', 'pos': 'POS'}.items()}
189        )
190
191        log.info("Finished constructing SNPObject")
192        return snpobj

Abstract class for SNP readers.

Attributes:
  • _filename: The path to the file storing SNP data.
def read( self, fields: Optional[List[str]] = None, exclude_fields: Optional[List[str]] = None, sample_ids: Optional[numpy.ndarray] = None, sample_idxs: Optional[numpy.ndarray] = None, variant_ids: Optional[numpy.ndarray] = None, variant_idxs: Optional[numpy.ndarray] = None, sum_strands: bool = False, separator: Optional[str] = None) -> snputils.snp.genobj.SNPObject:
 18    def read(
 19        self,
 20        fields: Optional[List[str]] = None,
 21        exclude_fields: Optional[List[str]] = None,
 22        sample_ids: Optional[np.ndarray] = None,
 23        sample_idxs: Optional[np.ndarray] = None,
 24        variant_ids: Optional[np.ndarray] = None,
 25        variant_idxs: Optional[np.ndarray] = None,
 26        sum_strands: bool = False,
 27        separator: Optional[str] = None,
 28    ) -> SNPObject:
 29        """
 30        Read a bed fileset (bed, bim, fam) into a SNPObject.
 31
 32        Args:
 33            fields (str, None, or list of str, optional): Fields to extract data for that should be included in the returned SNPObject.
 34                Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS'.
 35                To extract all fields, set fields to None. Defaults to None.
 36            exclude_fields (str, None, or list of str, optional): Fields to exclude from the returned SNPObject.
 37                Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS'.
 38                To exclude no fields, set exclude_fields to None. Defaults to None.
 39            sample_ids: List of sample IDs to read. If None and sample_idxs is None, all samples are read.
 40            sample_idxs: List of sample indices to read. If None and sample_ids is None, all samples are read.
 41            variant_ids: List of variant IDs to read. If None and variant_idxs is None, all variants are read.
 42            variant_idxs: List of variant indices to read. If None and variant_ids is None, all variants are read.
 43            sum_strands: True if the maternal and paternal strands are to be summed together,
 44                False if the strands are to be stored separately. Note that due to the pgenlib backend, when sum_strands is False,
 45                8 times as much RAM is required. Nonetheless, the calldata_gt will only be double the size.
 46                WARNING: bed files do not store phase information. If you need it, use vcf or pgen.
 47            separator: Separator used in the pvar file. If None, the separator is automatically detected.
 48                If the automatic detection fails, please specify the separator manually.
 49
 50        Returns:
 51            snpobj: SNPObject containing the data from the pgen fileset.
 52                If sum_strands is False, calldata_gt is stored as a numpy array of shape
 53                (num_variants, num_samples, 2) and dtype int8 containing 0, 1.
 54                If sum_strands is True, calldata_gt is stored as a numpy array of shape
 55                (num_variants, num_samples) and dtype int8 containing 0, 1, 2.
 56
 57        Raises:
 58            AssertionError: If both sample_idxs and sample_ids are specified.
 59            AssertionError: If both variant_idxs and variant_ids are specified.
 60        """
 61        assert (
 62            sample_idxs is None or sample_ids is None
 63        ), "Only one of sample_idxs and sample_ids can be specified"
 64        assert (
 65            variant_idxs is None or variant_ids is None
 66        ), "Only one of variant_idxs and variant_ids can be specified"
 67
 68        if isinstance(fields, str):
 69            fields = [fields]
 70        if isinstance(exclude_fields, str):
 71            exclude_fields = [exclude_fields]
 72
 73        fields = fields or ["GT", "IID", "REF", "ALT", "#CHROM", "ID", "POS"]
 74        exclude_fields = exclude_fields or []
 75        fields = [field for field in fields if field not in exclude_fields]
 76        only_read_bed = fields == ["GT"] and variant_idxs is None and sample_idxs is None
 77
 78        filename_noext = str(self.filename)
 79        if filename_noext[-4:].lower() in (".bed", ".bim", ".fam"):
 80            filename_noext = filename_noext[:-4]
 81
 82        if only_read_bed:
 83            with open(filename_noext + '.fam', 'r') as f:
 84                file_num_samples = sum(1 for _ in f)  # Get sample count from fam file
 85            file_num_variants = None  # Not needed
 86        else:
 87            log.info(f"Reading {filename_noext}.bim")
 88
 89            if separator is None:
 90                with open(filename_noext + ".bim", "r") as file:
 91                    separator = csv.Sniffer().sniff(file.readline()).delimiter
 92
 93            bim = pl.read_csv(
 94                filename_noext + ".bim",
 95                separator=separator,
 96                has_header=False,
 97                new_columns=["#CHROM", "ID", "CM", "POS", "ALT", "REF"],
 98                schema_overrides={
 99                    "#CHROM": pl.String,
100                    "ID": pl.String,
101                    "CM": pl.Float64,
102                    "POS": pl.Int64,
103                    "ALT": pl.String,
104                    "REF": pl.String
105                },
106                null_values=["NA"]
107            ).with_row_index()
108            file_num_variants = bim.height
109
110            if variant_ids is not None:
111                variant_idxs = bim.filter(pl.col("ID").is_in(variant_ids)).select("index").to_series().to_numpy()
112
113            if variant_idxs is None:
114                num_variants = file_num_variants
115                variant_idxs = np.arange(num_variants, dtype=np.uint32)
116            else:
117                num_variants = np.size(variant_idxs)
118                variant_idxs = np.array(variant_idxs, dtype=np.uint32)
119                bim = bim.filter(pl.col("index").is_in(variant_idxs))
120
121            log.info(f"Reading {filename_noext}.fam")
122
123            fam = pl.read_csv(
124                filename_noext + ".fam",
125                separator=separator,
126                has_header=False,
127                new_columns=["Family ID", "IID", "Father ID",
128                             "Mother ID", "Sex code", "Phenotype value"],
129                schema_overrides={
130                    "Family ID": pl.String,
131                    "IID": pl.String,
132                    "Father ID": pl.String,
133                    "Mother ID": pl.String,
134                    "Sex code": pl.String,
135                },
136                null_values=["NA"]
137            ).with_row_index()
138            file_num_samples = fam.height
139
140            if sample_ids is not None:
141                sample_idxs = fam.filter(pl.col("IID").is_in(sample_ids)).select("index").to_series().to_numpy()
142
143            if sample_idxs is None:
144                num_samples = file_num_samples
145            else:
146                num_samples = np.size(sample_idxs)
147                sample_idxs = np.array(sample_idxs, dtype=np.uint32)
148                fam = fam.filter(pl.col("index").is_in(sample_idxs))
149
150        if "GT" in fields:
151            log.info(f"Reading {filename_noext}.bed")
152            pgen_reader = pg.PgenReader(
153                str.encode(filename_noext + ".bed"),
154                raw_sample_ct=file_num_samples,
155                variant_ct=file_num_variants,
156                sample_subset=sample_idxs,
157            )
158
159            if only_read_bed:
160                num_samples = pgen_reader.get_raw_sample_ct()
161                num_variants = pgen_reader.get_variant_ct()
162                variant_idxs = np.arange(num_variants, dtype=np.uint32)
163
164            # required arrays: variant_idxs + sample_idxs + genotypes
165            if not sum_strands:
166                required_ram = (num_samples + num_variants + num_variants * 2 * num_samples) * 4
167            else:
168                required_ram = (num_samples + num_variants) * 4 + num_variants * num_samples
169            log.info(f">{required_ram / 1024**3:.2f} GiB of RAM are required to process {num_samples} samples with {num_variants} variants each")
170
171            if not sum_strands:
172                genotypes = np.empty((num_variants, 2 * num_samples), dtype=np.int32)  # cannot use int8 because of pgenlib
173                pgen_reader.read_alleles_list(variant_idxs, genotypes)
174                genotypes = genotypes.astype(np.int8).reshape((num_variants, num_samples, 2))
175            else:
176                genotypes = np.empty((num_variants, num_samples), dtype=np.int8)
177                pgen_reader.read_list(variant_idxs, genotypes)
178            pgen_reader.close()
179        else:
180            genotypes = None
181
182        log.info("Constructing SNPObject")
183
184        snpobj = SNPObject(
185            calldata_gt=genotypes if "GT" in fields else None,
186            samples=fam.get_column("IID").to_numpy() if "IID" in fields and "IID" in fam.columns else None,
187            **{f'variants_{k.lower()}': bim.get_column(v).to_numpy() if v in fields and v in bim.columns else None
188               for k, v in {'ref': 'REF', 'alt': 'ALT', 'chrom': '#CHROM', 'id': 'ID', 'pos': 'POS'}.items()}
189        )
190
191        log.info("Finished constructing SNPObject")
192        return snpobj

Read a bed fileset (bed, bim, fam) into a SNPObject.

Arguments:
  • fields (str, None, or list of str, optional): Fields to extract data for that should be included in the returned SNPObject. Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS'. To extract all fields, set fields to None. Defaults to None.
  • exclude_fields (str, None, or list of str, optional): Fields to exclude from the returned SNPObject. Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS'. To exclude no fields, set exclude_fields to None. Defaults to None.
  • sample_ids: List of sample IDs to read. If None and sample_idxs is None, all samples are read.
  • sample_idxs: List of sample indices to read. If None and sample_ids is None, all samples are read.
  • variant_ids: List of variant IDs to read. If None and variant_idxs is None, all variants are read.
  • variant_idxs: List of variant indices to read. If None and variant_ids is None, all variants are read.
  • sum_strands: True if the maternal and paternal strands are to be summed together, False if the strands are to be stored separately. Note that due to the pgenlib backend, when sum_strands is False, 8 times as much RAM is required. Nonetheless, the calldata_gt will only be double the size. WARNING: bed files do not store phase information. If you need it, use vcf or pgen.
  • separator: Separator used in the pvar file. If None, the separator is automatically detected. If the automatic detection fails, please specify the separator manually.
Returns:

snpobj: SNPObject containing the data from the pgen fileset. If sum_strands is False, calldata_gt is stored as a numpy array of shape (num_variants, num_samples, 2) and dtype int8 containing 0, 1. If sum_strands is True, calldata_gt is stored as a numpy array of shape (num_variants, num_samples) and dtype int8 containing 0, 1, 2.

Raises:
  • AssertionError: If both sample_idxs and sample_ids are specified.
  • AssertionError: If both variant_idxs and variant_ids are specified.
@SNPBaseReader.register
class PGENReader(snputils.snp.io.read.base.SNPBaseReader):
 17@SNPBaseReader.register
 18class PGENReader(SNPBaseReader):
 19    def read(
 20        self,
 21        fields: Optional[List[str]] = None,
 22        exclude_fields: Optional[List[str]] = None,
 23        sample_ids: Optional[np.ndarray] = None,
 24        sample_idxs: Optional[np.ndarray] = None,
 25        variant_ids: Optional[np.ndarray] = None,
 26        variant_idxs: Optional[np.ndarray] = None,
 27        sum_strands: bool = False,
 28        separator: str = None,
 29    ) -> SNPObject:
 30        """
 31        Read a pgen fileset (pgen, psam, pvar) into a SNPObject.
 32
 33        Args:
 34            fields (str, None, or list of str, optional): Fields to extract data for that should be included in the returned SNPObject.
 35                Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS', 'FILTER', 'QUAL'.
 36                To extract all fields, set fields to None. Defaults to None.
 37            exclude_fields (str, None, or list of str, optional): Fields to exclude from the returned SNPObject.
 38                Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS', 'FILTER', 'QUAL'.
 39                To exclude no fields, set exclude_fields to None. Defaults to None.
 40            sample_ids: List of sample IDs to read. If None and sample_idxs is None, all samples are read.
 41            sample_idxs: List of sample indices to read. If None and sample_ids is None, all samples are read.
 42            variant_ids: List of variant IDs to read. If None and variant_idxs is None, all variants are read.
 43            variant_idxs: List of variant indices to read. If None and variant_ids is None, all variants are read.
 44            sum_strands: True if the maternal and paternal strands are to be summed together,
 45                False if the strands are to be stored separately. Note that due to the pgenlib backend, when sum_strands is False,
 46                8 times as much RAM is required. Nonetheless, the calldata_gt will only be double the size.
 47            separator: Separator used in the pvar file. If None, the separator is automatically detected.
 48                If the automatic detection fails, please specify the separator manually.
 49
 50        Returns:
 51            snpobj: SNPObject containing the data from the pgen fileset.
 52                If sum_strands is False, calldata_gt is stored as a numpy array of shape
 53                (num_variants, num_samples, 2) and dtype int8 containing 0, 1.
 54                If sum_strands is True, calldata_gt is stored as a numpy array of shape
 55                (num_variants, num_samples) and dtype int8 containing 0, 1, 2.
 56
 57        Raises:
 58            AssertionError: If both sample_idxs and sample_ids are specified.
 59            AssertionError: If both variant_idxs and variant_ids are specified.
 60        """
 61        assert (
 62            sample_idxs is None or sample_ids is None
 63        ), "Only one of sample_idxs and sample_ids can be specified"
 64        assert (
 65            variant_idxs is None or variant_ids is None
 66        ), "Only one of variant_idxs and variant_ids can be specified"
 67
 68        if isinstance(fields, str):
 69            fields = [fields]
 70        if isinstance(exclude_fields, str):
 71            exclude_fields = [exclude_fields]
 72
 73        fields = fields or ["GT", "IID", "REF", "ALT", "#CHROM", "ID", "POS", "FILTER", "QUAL"]
 74        exclude_fields = exclude_fields or []
 75        fields = [field for field in fields if field not in exclude_fields]
 76        only_read_pgen = fields == ["GT"] and variant_idxs is None and sample_idxs is None
 77
 78        filename_noext = str(self.filename)
 79        for ext in [".pgen", ".pvar", ".pvar.zst", ".psam"]:
 80            if filename_noext.endswith(ext):
 81                filename_noext = filename_noext[:-len(ext)]
 82                break
 83
 84        if only_read_pgen:
 85            file_num_samples = None  # Not needed for pgen
 86            file_num_variants = None  # Not needed
 87        else:
 88            pvar_extensions = [".pvar", ".pvar.zst"]
 89            pvar_filename = None
 90            for ext in pvar_extensions:
 91                possible_pvar = filename_noext + ext
 92                if os.path.exists(possible_pvar):
 93                    pvar_filename = possible_pvar
 94                    break
 95            if pvar_filename is None:
 96                raise FileNotFoundError(f"No .pvar or .pvar.zst file found for {filename_noext}")
 97
 98            log.info(f"Reading {pvar_filename}")
 99
100            def open_textfile(filename):
101                if filename.endswith('.zst'):
102                    import zstandard as zstd
103                    return zstd.open(filename, 'rt')
104                else:
105                    return open(filename, 'rt')
106
107            pvar_has_header = True
108            pvar_header_line_num = 0
109            with open_textfile(pvar_filename) as file:
110                for line_num, line in enumerate(file):
111                    if line.startswith("##"):  # Metadata
112                        continue
113                    else:
114                        if separator is None:
115                            separator = csv.Sniffer().sniff(file.readline()).delimiter
116                        if line.startswith("#CHROM"):  # Header
117                            pvar_header_line_num = line_num
118                            header = line.strip().split()
119                            break
120                        elif not line.startswith("#"):  # If no header, look at line 1
121                            pvar_has_header = False
122                            cols_in_pvar = len(line.strip().split(separator))
123                            if cols_in_pvar == 5:
124                                header = ["#CHROM", "ID", "POS", "ALT", "REF"]
125                            elif cols_in_pvar == 6:
126                                header = ["#CHROM", "ID", "CM", "POS", "ALT", "REF"]
127                            else:
128                                raise ValueError(
129                                    f"{pvar_filename} is not a valid pvar file."
130                                )
131                            break
132
133            pvar_reading_args = {
134                'separator': separator,
135                'skip_rows': pvar_header_line_num,
136                'has_header': pvar_has_header,
137                'new_columns': None if pvar_has_header else header,
138                'schema_overrides': {
139                    "#CHROM": pl.String,
140                    "POS": pl.UInt32,
141                    "ID": pl.String,
142                    "REF": pl.String,
143                    "ALT": pl.String,
144                },
145                'null_values': ["NA"],
146            }
147            if pvar_filename.endswith('.zst'):
148                pvar = pl.read_csv(pvar_filename, **pvar_reading_args).lazy()
149            else:
150                pvar = pl.scan_csv(pvar_filename, **pvar_reading_args)
151
152            # We need to track row positions of variants but doing this with lazy operations would
153            # defeat predicate pushdown optimization and force loading the entire DataFrame.
154            # Solution: We only materialize the ID column with indices, keeping memory usage minimal
155            # while maintaining the ability to map between variant IDs and their file positions.
156            variant_id_idxs = pvar.select("ID").with_row_index().collect()
157
158            file_num_variants = variant_id_idxs.height
159
160            if variant_ids is not None:
161                num_variants = np.size(variant_ids)
162                pvar = pvar.filter(pl.col("ID").is_in(variant_ids)).collect()
163                variant_idxs = variant_id_idxs.filter(pl.col("ID").is_in(variant_ids)).select("index").to_series().to_numpy()
164            elif variant_idxs is not None:
165                num_variants = np.size(variant_idxs)
166                variant_idxs = np.array(variant_idxs, dtype=np.uint32)
167                variant_ids = variant_id_idxs.filter(pl.col("index").is_in(variant_idxs)).select("ID")
168                pvar = pvar.filter(pl.col("ID").is_in(variant_ids)).collect()
169            else:
170                num_variants = file_num_variants
171                variant_idxs = np.arange(num_variants, dtype=np.uint32)
172                pvar = pvar.collect()
173
174            log.info(f"Reading {filename_noext}.psam")
175
176            with open(filename_noext + ".psam") as file:
177                first_line = file.readline().strip()
178                psam_has_header = first_line.startswith(("#FID", "FID", "#IID", "IID"))
179
180            psam = pl.read_csv(
181                filename_noext + ".psam",
182                separator=separator,
183                has_header=psam_has_header,
184                new_columns=None if psam_has_header else ["FID", "IID", "PAT", "MAT", "SEX", "PHENO1"],
185                null_values=["NA"],
186            ).with_row_index()
187            if "#IID" in psam.columns:
188                psam = psam.rename({"#IID": "IID"})
189            if "#FID" in psam.columns:
190                psam = psam.rename({"#FID": "FID"})
191
192            file_num_samples = psam.height
193
194            if sample_ids is not None:
195                num_samples = np.size(sample_ids)
196                psam = psam.filter(pl.col("IID").is_in(sample_ids))
197                sample_idxs = psam.select("index").to_series().to_numpy()
198            elif sample_idxs is not None:
199                num_samples = np.size(sample_idxs)
200                sample_idxs = np.array(sample_idxs, dtype=np.uint32)
201                psam = psam.filter(pl.col("index").is_in(sample_idxs))
202            else:
203                num_samples = file_num_samples
204
205        if "GT" in fields:
206            log.info(f"Reading {filename_noext}.pgen")
207            pgen_reader = pg.PgenReader(
208                str.encode(filename_noext + ".pgen"),
209                raw_sample_ct=file_num_samples,
210                variant_ct=file_num_variants,
211                sample_subset=sample_idxs,
212            )
213
214            if only_read_pgen:
215                num_samples = pgen_reader.get_raw_sample_ct()
216                num_variants = pgen_reader.get_variant_ct()
217                variant_idxs = np.arange(num_variants, dtype=np.uint32)
218
219            # required arrays: variant_idxs + sample_idxs + genotypes
220            if not sum_strands:
221                required_ram = (num_samples + num_variants + num_variants * 2 * num_samples) * 4
222            else:
223                required_ram = (num_samples + num_variants) * 4 + num_variants * num_samples
224            log.info(f">{required_ram / 1024**3:.2f} GiB of RAM are required to process {num_samples} samples with {num_variants} variants each")
225
226            if not sum_strands:
227                genotypes = np.empty((num_variants, 2 * num_samples), dtype=np.int32)  # cannot use int8 because of pgenlib
228                pgen_reader.read_alleles_list(variant_idxs, genotypes)
229                genotypes = genotypes.astype(np.int8).reshape((num_variants, num_samples, 2))
230            else:
231                genotypes = np.empty((num_variants, num_samples), dtype=np.int8)
232                pgen_reader.read_list(variant_idxs, genotypes)
233            pgen_reader.close()
234        else:
235            genotypes = None
236
237        log.info("Constructing SNPObject")
238
239        snpobj = SNPObject(
240            calldata_gt=genotypes if "GT" in fields else None,
241            samples=psam.get_column("IID").to_numpy() if "IID" in fields and "IID" in psam.columns else None,
242            **{f'variants_{k.lower()}': pvar.get_column(v).to_numpy() if v in fields and v in pvar.columns else None
243               for k, v in {'ref': 'REF', 'alt': 'ALT', 'chrom': '#CHROM', 'id': 'ID', 'pos': 'POS', 'filter_pass': 'FILTER', 'qual': 'QUAL'}.items()}
244        )
245
246        log.info("Finished constructing SNPObject")
247        return snpobj

Abstract class for SNP readers.

Attributes:
  • _filename: The path to the file storing SNP data.
def read( self, fields: Optional[List[str]] = None, exclude_fields: Optional[List[str]] = None, sample_ids: Optional[numpy.ndarray] = None, sample_idxs: Optional[numpy.ndarray] = None, variant_ids: Optional[numpy.ndarray] = None, variant_idxs: Optional[numpy.ndarray] = None, sum_strands: bool = False, separator: str = None) -> snputils.snp.genobj.SNPObject:
 19    def read(
 20        self,
 21        fields: Optional[List[str]] = None,
 22        exclude_fields: Optional[List[str]] = None,
 23        sample_ids: Optional[np.ndarray] = None,
 24        sample_idxs: Optional[np.ndarray] = None,
 25        variant_ids: Optional[np.ndarray] = None,
 26        variant_idxs: Optional[np.ndarray] = None,
 27        sum_strands: bool = False,
 28        separator: str = None,
 29    ) -> SNPObject:
 30        """
 31        Read a pgen fileset (pgen, psam, pvar) into a SNPObject.
 32
 33        Args:
 34            fields (str, None, or list of str, optional): Fields to extract data for that should be included in the returned SNPObject.
 35                Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS', 'FILTER', 'QUAL'.
 36                To extract all fields, set fields to None. Defaults to None.
 37            exclude_fields (str, None, or list of str, optional): Fields to exclude from the returned SNPObject.
 38                Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS', 'FILTER', 'QUAL'.
 39                To exclude no fields, set exclude_fields to None. Defaults to None.
 40            sample_ids: List of sample IDs to read. If None and sample_idxs is None, all samples are read.
 41            sample_idxs: List of sample indices to read. If None and sample_ids is None, all samples are read.
 42            variant_ids: List of variant IDs to read. If None and variant_idxs is None, all variants are read.
 43            variant_idxs: List of variant indices to read. If None and variant_ids is None, all variants are read.
 44            sum_strands: True if the maternal and paternal strands are to be summed together,
 45                False if the strands are to be stored separately. Note that due to the pgenlib backend, when sum_strands is False,
 46                8 times as much RAM is required. Nonetheless, the calldata_gt will only be double the size.
 47            separator: Separator used in the pvar file. If None, the separator is automatically detected.
 48                If the automatic detection fails, please specify the separator manually.
 49
 50        Returns:
 51            snpobj: SNPObject containing the data from the pgen fileset.
 52                If sum_strands is False, calldata_gt is stored as a numpy array of shape
 53                (num_variants, num_samples, 2) and dtype int8 containing 0, 1.
 54                If sum_strands is True, calldata_gt is stored as a numpy array of shape
 55                (num_variants, num_samples) and dtype int8 containing 0, 1, 2.
 56
 57        Raises:
 58            AssertionError: If both sample_idxs and sample_ids are specified.
 59            AssertionError: If both variant_idxs and variant_ids are specified.
 60        """
 61        assert (
 62            sample_idxs is None or sample_ids is None
 63        ), "Only one of sample_idxs and sample_ids can be specified"
 64        assert (
 65            variant_idxs is None or variant_ids is None
 66        ), "Only one of variant_idxs and variant_ids can be specified"
 67
 68        if isinstance(fields, str):
 69            fields = [fields]
 70        if isinstance(exclude_fields, str):
 71            exclude_fields = [exclude_fields]
 72
 73        fields = fields or ["GT", "IID", "REF", "ALT", "#CHROM", "ID", "POS", "FILTER", "QUAL"]
 74        exclude_fields = exclude_fields or []
 75        fields = [field for field in fields if field not in exclude_fields]
 76        only_read_pgen = fields == ["GT"] and variant_idxs is None and sample_idxs is None
 77
 78        filename_noext = str(self.filename)
 79        for ext in [".pgen", ".pvar", ".pvar.zst", ".psam"]:
 80            if filename_noext.endswith(ext):
 81                filename_noext = filename_noext[:-len(ext)]
 82                break
 83
 84        if only_read_pgen:
 85            file_num_samples = None  # Not needed for pgen
 86            file_num_variants = None  # Not needed
 87        else:
 88            pvar_extensions = [".pvar", ".pvar.zst"]
 89            pvar_filename = None
 90            for ext in pvar_extensions:
 91                possible_pvar = filename_noext + ext
 92                if os.path.exists(possible_pvar):
 93                    pvar_filename = possible_pvar
 94                    break
 95            if pvar_filename is None:
 96                raise FileNotFoundError(f"No .pvar or .pvar.zst file found for {filename_noext}")
 97
 98            log.info(f"Reading {pvar_filename}")
 99
100            def open_textfile(filename):
101                if filename.endswith('.zst'):
102                    import zstandard as zstd
103                    return zstd.open(filename, 'rt')
104                else:
105                    return open(filename, 'rt')
106
107            pvar_has_header = True
108            pvar_header_line_num = 0
109            with open_textfile(pvar_filename) as file:
110                for line_num, line in enumerate(file):
111                    if line.startswith("##"):  # Metadata
112                        continue
113                    else:
114                        if separator is None:
115                            separator = csv.Sniffer().sniff(file.readline()).delimiter
116                        if line.startswith("#CHROM"):  # Header
117                            pvar_header_line_num = line_num
118                            header = line.strip().split()
119                            break
120                        elif not line.startswith("#"):  # If no header, look at line 1
121                            pvar_has_header = False
122                            cols_in_pvar = len(line.strip().split(separator))
123                            if cols_in_pvar == 5:
124                                header = ["#CHROM", "ID", "POS", "ALT", "REF"]
125                            elif cols_in_pvar == 6:
126                                header = ["#CHROM", "ID", "CM", "POS", "ALT", "REF"]
127                            else:
128                                raise ValueError(
129                                    f"{pvar_filename} is not a valid pvar file."
130                                )
131                            break
132
133            pvar_reading_args = {
134                'separator': separator,
135                'skip_rows': pvar_header_line_num,
136                'has_header': pvar_has_header,
137                'new_columns': None if pvar_has_header else header,
138                'schema_overrides': {
139                    "#CHROM": pl.String,
140                    "POS": pl.UInt32,
141                    "ID": pl.String,
142                    "REF": pl.String,
143                    "ALT": pl.String,
144                },
145                'null_values': ["NA"],
146            }
147            if pvar_filename.endswith('.zst'):
148                pvar = pl.read_csv(pvar_filename, **pvar_reading_args).lazy()
149            else:
150                pvar = pl.scan_csv(pvar_filename, **pvar_reading_args)
151
152            # We need to track row positions of variants but doing this with lazy operations would
153            # defeat predicate pushdown optimization and force loading the entire DataFrame.
154            # Solution: We only materialize the ID column with indices, keeping memory usage minimal
155            # while maintaining the ability to map between variant IDs and their file positions.
156            variant_id_idxs = pvar.select("ID").with_row_index().collect()
157
158            file_num_variants = variant_id_idxs.height
159
160            if variant_ids is not None:
161                num_variants = np.size(variant_ids)
162                pvar = pvar.filter(pl.col("ID").is_in(variant_ids)).collect()
163                variant_idxs = variant_id_idxs.filter(pl.col("ID").is_in(variant_ids)).select("index").to_series().to_numpy()
164            elif variant_idxs is not None:
165                num_variants = np.size(variant_idxs)
166                variant_idxs = np.array(variant_idxs, dtype=np.uint32)
167                variant_ids = variant_id_idxs.filter(pl.col("index").is_in(variant_idxs)).select("ID")
168                pvar = pvar.filter(pl.col("ID").is_in(variant_ids)).collect()
169            else:
170                num_variants = file_num_variants
171                variant_idxs = np.arange(num_variants, dtype=np.uint32)
172                pvar = pvar.collect()
173
174            log.info(f"Reading {filename_noext}.psam")
175
176            with open(filename_noext + ".psam") as file:
177                first_line = file.readline().strip()
178                psam_has_header = first_line.startswith(("#FID", "FID", "#IID", "IID"))
179
180            psam = pl.read_csv(
181                filename_noext + ".psam",
182                separator=separator,
183                has_header=psam_has_header,
184                new_columns=None if psam_has_header else ["FID", "IID", "PAT", "MAT", "SEX", "PHENO1"],
185                null_values=["NA"],
186            ).with_row_index()
187            if "#IID" in psam.columns:
188                psam = psam.rename({"#IID": "IID"})
189            if "#FID" in psam.columns:
190                psam = psam.rename({"#FID": "FID"})
191
192            file_num_samples = psam.height
193
194            if sample_ids is not None:
195                num_samples = np.size(sample_ids)
196                psam = psam.filter(pl.col("IID").is_in(sample_ids))
197                sample_idxs = psam.select("index").to_series().to_numpy()
198            elif sample_idxs is not None:
199                num_samples = np.size(sample_idxs)
200                sample_idxs = np.array(sample_idxs, dtype=np.uint32)
201                psam = psam.filter(pl.col("index").is_in(sample_idxs))
202            else:
203                num_samples = file_num_samples
204
205        if "GT" in fields:
206            log.info(f"Reading {filename_noext}.pgen")
207            pgen_reader = pg.PgenReader(
208                str.encode(filename_noext + ".pgen"),
209                raw_sample_ct=file_num_samples,
210                variant_ct=file_num_variants,
211                sample_subset=sample_idxs,
212            )
213
214            if only_read_pgen:
215                num_samples = pgen_reader.get_raw_sample_ct()
216                num_variants = pgen_reader.get_variant_ct()
217                variant_idxs = np.arange(num_variants, dtype=np.uint32)
218
219            # required arrays: variant_idxs + sample_idxs + genotypes
220            if not sum_strands:
221                required_ram = (num_samples + num_variants + num_variants * 2 * num_samples) * 4
222            else:
223                required_ram = (num_samples + num_variants) * 4 + num_variants * num_samples
224            log.info(f">{required_ram / 1024**3:.2f} GiB of RAM are required to process {num_samples} samples with {num_variants} variants each")
225
226            if not sum_strands:
227                genotypes = np.empty((num_variants, 2 * num_samples), dtype=np.int32)  # cannot use int8 because of pgenlib
228                pgen_reader.read_alleles_list(variant_idxs, genotypes)
229                genotypes = genotypes.astype(np.int8).reshape((num_variants, num_samples, 2))
230            else:
231                genotypes = np.empty((num_variants, num_samples), dtype=np.int8)
232                pgen_reader.read_list(variant_idxs, genotypes)
233            pgen_reader.close()
234        else:
235            genotypes = None
236
237        log.info("Constructing SNPObject")
238
239        snpobj = SNPObject(
240            calldata_gt=genotypes if "GT" in fields else None,
241            samples=psam.get_column("IID").to_numpy() if "IID" in fields and "IID" in psam.columns else None,
242            **{f'variants_{k.lower()}': pvar.get_column(v).to_numpy() if v in fields and v in pvar.columns else None
243               for k, v in {'ref': 'REF', 'alt': 'ALT', 'chrom': '#CHROM', 'id': 'ID', 'pos': 'POS', 'filter_pass': 'FILTER', 'qual': 'QUAL'}.items()}
244        )
245
246        log.info("Finished constructing SNPObject")
247        return snpobj

Read a pgen fileset (pgen, psam, pvar) into a SNPObject.

Arguments:
  • fields (str, None, or list of str, optional): Fields to extract data for that should be included in the returned SNPObject. Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS', 'FILTER', 'QUAL'. To extract all fields, set fields to None. Defaults to None.
  • exclude_fields (str, None, or list of str, optional): Fields to exclude from the returned SNPObject. Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS', 'FILTER', 'QUAL'. To exclude no fields, set exclude_fields to None. Defaults to None.
  • sample_ids: List of sample IDs to read. If None and sample_idxs is None, all samples are read.
  • sample_idxs: List of sample indices to read. If None and sample_ids is None, all samples are read.
  • variant_ids: List of variant IDs to read. If None and variant_idxs is None, all variants are read.
  • variant_idxs: List of variant indices to read. If None and variant_ids is None, all variants are read.
  • sum_strands: True if the maternal and paternal strands are to be summed together, False if the strands are to be stored separately. Note that due to the pgenlib backend, when sum_strands is False, 8 times as much RAM is required. Nonetheless, the calldata_gt will only be double the size.
  • separator: Separator used in the pvar file. If None, the separator is automatically detected. If the automatic detection fails, please specify the separator manually.
Returns:

snpobj: SNPObject containing the data from the pgen fileset. If sum_strands is False, calldata_gt is stored as a numpy array of shape (num_variants, num_samples, 2) and dtype int8 containing 0, 1. If sum_strands is True, calldata_gt is stored as a numpy array of shape (num_variants, num_samples) and dtype int8 containing 0, 1, 2.

Raises:
  • AssertionError: If both sample_idxs and sample_ids are specified.
  • AssertionError: If both variant_idxs and variant_ids are specified.
@SNPBaseReader.register
class VCFReader(snputils.snp.io.read.base.SNPBaseReader):
 18@SNPBaseReader.register
 19class VCFReader(SNPBaseReader):
 20    def read(
 21        self,
 22        fields: Optional[List[str]] = None,
 23        exclude_fields: Optional[List[str]] = None,
 24        rename_fields: Optional[dict] = None,
 25        fills: Optional[dict] = None,
 26        region: Optional[str] = None,
 27        samples: Optional[List[str]] = None,
 28        sum_strands: bool = False,
 29    ) -> SNPObject:
 30        """
 31        Read a vcf file into a SNPObject.
 32
 33        Args:
 34            fields: Fields to extract data for. e.g., ['variants/CHROM', 'variants/POS',
 35                'calldata/GT']. If you are feeling lazy, you can drop the 'variants/'
 36                and 'calldata/' prefixes, in which case the fields will be matched
 37                against fields declared in the VCF header, with variants taking priority
 38                over calldata if a field with the same ID exists both in INFO and FORMAT
 39                headers. I.e., ['CHROM', 'POS', 'DP', 'GT'] will work, although watch out
 40                for fields like 'DP' which can be both INFO and FORMAT. To extract all
 41                fields, provide just the string '*'. To extract all variants fields
 42                (including all INFO fields) provide 'variants/*'. To extract all
 43                calldata fields (i.e., defined in FORMAT headers) provide 'calldata/*'.
 44            exclude_fields: Fields to exclude. E.g., for use in combination with fields='*'.
 45            rename_fields: Fields to be renamed. Should be a dictionary mapping old to new names.
 46            fills: Override the fill value used for empty values. Should be a dictionary
 47                mapping field names to fill values.
 48            region: Genomic region to extract variants for. If provided, should be a
 49                tabix-style region string, which can be either just a chromosome name
 50                (e.g., '2L'), or a chromosome name followed by 1-based beginning and
 51                end coordinates (e.g., '2L:100000-200000'). Note that only variants
 52                whose start position (POS) is within the requested range will be included.
 53                This is slightly different from the default tabix behaviour, where a
 54                variant (e.g., deletion) may be included if its position (POS) occurs
 55                before the requested region but its reference allele overlaps the
 56                region - such a variant will not be included in the data returned
 57                by this function.
 58            samples: Selection of samples to extract calldata for. If provided, should be
 59                a list of strings giving sample identifiers. May also be a list of
 60                integers giving indices of selected samples.
 61            sum_strands: True if the maternal and paternal strands are to be summed together,
 62            False if the strands are to be stored separately.
 63
 64        Returns:
 65            snpobj: SNPObject containing the data from the vcf file.
 66                If sum_strands is False, calldata_gt is stored as a numpy array of shape
 67                (num_variants, num_samples, 2) and dtype int8 containing 0, 1.
 68                If sum_strands is True, calldata_gt is stored as a numpy array of shape
 69                (num_variants, num_samples) and dtype int8 containing 0, 1, 2.
 70        """
 71        log.info(f"Reading {self.filename}")
 72
 73        vcf_dict = allel.read_vcf(
 74            str(self.filename),
 75            fields=fields,
 76            exclude_fields=exclude_fields,
 77            rename_fields=rename_fields,
 78            fills=fills,
 79            region=region,
 80            samples=samples,
 81            alt_number=1,
 82        )
 83        assert vcf_dict is not None  # suppress Flake8 warning
 84
 85        genotypes = vcf_dict["calldata/GT"].astype(np.int8)
 86        if sum_strands:
 87            genotypes = genotypes.sum(axis=2, dtype=np.int8)
 88
 89        snpobj = SNPObject(
 90            calldata_gt=genotypes,
 91            samples=vcf_dict["samples"],
 92            variants_ref=vcf_dict["variants/REF"],
 93            variants_alt=vcf_dict["variants/ALT"],
 94            variants_chrom=vcf_dict["variants/CHROM"],
 95            variants_filter_pass=vcf_dict["variants/FILTER_PASS"],
 96            variants_id=vcf_dict["variants/ID"],
 97            variants_pos=vcf_dict["variants/POS"],
 98            variants_qual=vcf_dict["variants/QUAL"],
 99        )
100
101        log.info(f"Finished reading {self.filename}")
102        return snpobj

Abstract class for SNP readers.

Attributes:
  • _filename: The path to the file storing SNP data.
def read( self, fields: Optional[List[str]] = None, exclude_fields: Optional[List[str]] = None, rename_fields: Optional[dict] = None, fills: Optional[dict] = None, region: Optional[str] = None, samples: Optional[List[str]] = None, sum_strands: bool = False) -> snputils.snp.genobj.SNPObject:
 20    def read(
 21        self,
 22        fields: Optional[List[str]] = None,
 23        exclude_fields: Optional[List[str]] = None,
 24        rename_fields: Optional[dict] = None,
 25        fills: Optional[dict] = None,
 26        region: Optional[str] = None,
 27        samples: Optional[List[str]] = None,
 28        sum_strands: bool = False,
 29    ) -> SNPObject:
 30        """
 31        Read a vcf file into a SNPObject.
 32
 33        Args:
 34            fields: Fields to extract data for. e.g., ['variants/CHROM', 'variants/POS',
 35                'calldata/GT']. If you are feeling lazy, you can drop the 'variants/'
 36                and 'calldata/' prefixes, in which case the fields will be matched
 37                against fields declared in the VCF header, with variants taking priority
 38                over calldata if a field with the same ID exists both in INFO and FORMAT
 39                headers. I.e., ['CHROM', 'POS', 'DP', 'GT'] will work, although watch out
 40                for fields like 'DP' which can be both INFO and FORMAT. To extract all
 41                fields, provide just the string '*'. To extract all variants fields
 42                (including all INFO fields) provide 'variants/*'. To extract all
 43                calldata fields (i.e., defined in FORMAT headers) provide 'calldata/*'.
 44            exclude_fields: Fields to exclude. E.g., for use in combination with fields='*'.
 45            rename_fields: Fields to be renamed. Should be a dictionary mapping old to new names.
 46            fills: Override the fill value used for empty values. Should be a dictionary
 47                mapping field names to fill values.
 48            region: Genomic region to extract variants for. If provided, should be a
 49                tabix-style region string, which can be either just a chromosome name
 50                (e.g., '2L'), or a chromosome name followed by 1-based beginning and
 51                end coordinates (e.g., '2L:100000-200000'). Note that only variants
 52                whose start position (POS) is within the requested range will be included.
 53                This is slightly different from the default tabix behaviour, where a
 54                variant (e.g., deletion) may be included if its position (POS) occurs
 55                before the requested region but its reference allele overlaps the
 56                region - such a variant will not be included in the data returned
 57                by this function.
 58            samples: Selection of samples to extract calldata for. If provided, should be
 59                a list of strings giving sample identifiers. May also be a list of
 60                integers giving indices of selected samples.
 61            sum_strands: True if the maternal and paternal strands are to be summed together,
 62            False if the strands are to be stored separately.
 63
 64        Returns:
 65            snpobj: SNPObject containing the data from the vcf file.
 66                If sum_strands is False, calldata_gt is stored as a numpy array of shape
 67                (num_variants, num_samples, 2) and dtype int8 containing 0, 1.
 68                If sum_strands is True, calldata_gt is stored as a numpy array of shape
 69                (num_variants, num_samples) and dtype int8 containing 0, 1, 2.
 70        """
 71        log.info(f"Reading {self.filename}")
 72
 73        vcf_dict = allel.read_vcf(
 74            str(self.filename),
 75            fields=fields,
 76            exclude_fields=exclude_fields,
 77            rename_fields=rename_fields,
 78            fills=fills,
 79            region=region,
 80            samples=samples,
 81            alt_number=1,
 82        )
 83        assert vcf_dict is not None  # suppress Flake8 warning
 84
 85        genotypes = vcf_dict["calldata/GT"].astype(np.int8)
 86        if sum_strands:
 87            genotypes = genotypes.sum(axis=2, dtype=np.int8)
 88
 89        snpobj = SNPObject(
 90            calldata_gt=genotypes,
 91            samples=vcf_dict["samples"],
 92            variants_ref=vcf_dict["variants/REF"],
 93            variants_alt=vcf_dict["variants/ALT"],
 94            variants_chrom=vcf_dict["variants/CHROM"],
 95            variants_filter_pass=vcf_dict["variants/FILTER_PASS"],
 96            variants_id=vcf_dict["variants/ID"],
 97            variants_pos=vcf_dict["variants/POS"],
 98            variants_qual=vcf_dict["variants/QUAL"],
 99        )
100
101        log.info(f"Finished reading {self.filename}")
102        return snpobj

Read a vcf file into a SNPObject.

Arguments:
  • fields: Fields to extract data for. e.g., ['variants/CHROM', 'variants/POS', 'calldata/GT']. If you are feeling lazy, you can drop the 'variants/' and 'calldata/' prefixes, in which case the fields will be matched against fields declared in the VCF header, with variants taking priority over calldata if a field with the same ID exists both in INFO and FORMAT headers. I.e., ['CHROM', 'POS', 'DP', 'GT'] will work, although watch out for fields like 'DP' which can be both INFO and FORMAT. To extract all fields, provide just the string ''. To extract all variants fields (including all INFO fields) provide 'variants/'. To extract all calldata fields (i.e., defined in FORMAT headers) provide 'calldata/*'.
  • exclude_fields: Fields to exclude. E.g., for use in combination with fields='*'.
  • rename_fields: Fields to be renamed. Should be a dictionary mapping old to new names.
  • fills: Override the fill value used for empty values. Should be a dictionary mapping field names to fill values.
  • region: Genomic region to extract variants for. If provided, should be a tabix-style region string, which can be either just a chromosome name (e.g., '2L'), or a chromosome name followed by 1-based beginning and end coordinates (e.g., '2L:100000-200000'). Note that only variants whose start position (POS) is within the requested range will be included. This is slightly different from the default tabix behaviour, where a variant (e.g., deletion) may be included if its position (POS) occurs before the requested region but its reference allele overlaps the region - such a variant will not be included in the data returned by this function.
  • samples: Selection of samples to extract calldata for. If provided, should be a list of strings giving sample identifiers. May also be a list of integers giving indices of selected samples.
  • sum_strands: True if the maternal and paternal strands are to be summed together,
  • False if the strands are to be stored separately.
Returns:

snpobj: SNPObject containing the data from the vcf file. If sum_strands is False, calldata_gt is stored as a numpy array of shape (num_variants, num_samples, 2) and dtype int8 containing 0, 1. If sum_strands is True, calldata_gt is stored as a numpy array of shape (num_variants, num_samples) and dtype int8 containing 0, 1, 2.

class BEDWriter:
 14class BEDWriter:
 15    """Writes an object in bed/bim/fam formats in the specified output path.
 16
 17    Args:
 18        snpobj: The SNPObject to be written.
 19        file: The output file path.
 20
 21    """
 22
 23    def __init__(self, snpobj: SNPObject, filename: str):
 24        self.__snpobj = snpobj.copy()
 25        self.__filename = Path(filename)
 26
 27    def write(
 28            self,
 29            rename_missing_values: bool = True, 
 30            before: Union[int, float, str] = -1, 
 31            after: Union[int, float, str] = '.'
 32        ):
 33        """
 34        Writes the SNPObject to bed/bim/fam formats.
 35
 36        Args:
 37            rename_missing_values (bool, optional):
 38                If True, renames potential missing values in `snpobj.calldata_gt` before writing. 
 39                Defaults to True.
 40            before (int, float, or str, default=-1): 
 41                The current representation of missing values in `calldata_gt`. Common values might be -1, '.', or NaN.
 42                Default is -1.
 43            after (int, float, or str, default='.'): 
 44                The value that will replace `before`. Default is '.'.
 45        """
 46        # Save .bed file
 47        if self.__filename.suffix != '.bed':
 48            self.__filename = self.__filename.with_suffix('.bed')
 49
 50        log.info(f"Writing .bed file: {self.__filename}")
 51
 52        # Optionally rename potential missing values in `snpobj.calldata_gt` before writing
 53        if rename_missing_values:
 54            self.__snpobj.rename_missings(before=before, after=after, inplace=True)
 55
 56        # If the input matrix has three dimensions, it indicates that the data is divided into two strands.
 57        if len(self.__snpobj.calldata_gt.shape) == 3:
 58            # Sum the two strands
 59            self.__snpobj.calldata_gt = self.__snpobj.calldata_gt.transpose(1, 0, 2).sum(axis=2, dtype=np.int8)
 60
 61        # Infer the number of samples and variants from the matrix
 62        samples, variants = self.__snpobj.calldata_gt.shape
 63
 64        # Define the PgenWriter to save the data
 65        data_save = pg.PgenWriter(filename=str(self.__filename).encode('utf-8'),
 66                                  sample_ct=samples,
 67                                  variant_ct=variants,
 68                                  nonref_flags=True,
 69                                  hardcall_phase_present=False,
 70                                  dosage_present=True,
 71                                  dosage_phase_present=False)
 72
 73        # Fill the data_save object with the matrix of individuals x variants
 74        for snp_i in range(0, variants):
 75            data_save.append_biallelic(np.ascontiguousarray(self.__snpobj.calldata_gt[:, snp_i]))
 76
 77        # Save the .bed file
 78        data_save.close()
 79
 80        log.info(f"Finished writing .bed file: {self.__filename}")
 81
 82        # Remove .bed from the file name
 83        if self.__filename.suffix == '.bed':
 84            self.__filename = self.__filename.with_suffix('')
 85
 86        # Save .fam file
 87        log.info(f"Writing .fam file: {self.__filename}")
 88
 89        # Fill .fam file
 90        fam_file = pd.DataFrame(columns=['fid', 'iid', 'father', 'mother', 'gender', 'trait'])
 91        fam_file['iid'] = self.__snpobj.samples
 92        fam_file['fid'] = self.__snpobj.samples
 93
 94        # Save .fam file
 95        fam_file.to_csv(self.__filename.with_suffix('.fam'), sep='\t', index=False, header=False)
 96        log.info(f"Finished writing .fam file: {self.__filename}")
 97
 98        # Save .bim file
 99        log.info(f"Writing .bim file: {self.__filename}")
100
101        # Fill .bim file
102        bim_file = pd.DataFrame(columns=['chrom', 'snp', 'cm', 'pos', 'a0', 'a1'])
103        bim_file['chrom'] = self.__snpobj.variants_chrom
104        bim_file['snp'] = self.__snpobj.variants_id
105        bim_file['cm'] = 0  # TODO: read, save and write too if available?
106        log.warning("The .bim file is being saved with 0 cM values.")
107        bim_file['pos'] = self.__snpobj.variants_pos
108        bim_file['a0'] = self.__snpobj.variants_alt
109        bim_file['a1'] = self.__snpobj.variants_ref
110
111        # Save .bim file
112        bim_file.to_csv(self.__filename.with_suffix('.bim'), sep='\t', index=False, header=False)
113        log.info(f"Finished writing .bim file: {self.__filename}")

Writes an object in bed/bim/fam formats in the specified output path.

Arguments:
  • snpobj: The SNPObject to be written.
  • file: The output file path.
BEDWriter(snpobj: snputils.snp.genobj.SNPObject, filename: str)
23    def __init__(self, snpobj: SNPObject, filename: str):
24        self.__snpobj = snpobj.copy()
25        self.__filename = Path(filename)
def write( self, rename_missing_values: bool = True, before: Union[int, float, str] = -1, after: Union[int, float, str] = '.'):
 27    def write(
 28            self,
 29            rename_missing_values: bool = True, 
 30            before: Union[int, float, str] = -1, 
 31            after: Union[int, float, str] = '.'
 32        ):
 33        """
 34        Writes the SNPObject to bed/bim/fam formats.
 35
 36        Args:
 37            rename_missing_values (bool, optional):
 38                If True, renames potential missing values in `snpobj.calldata_gt` before writing. 
 39                Defaults to True.
 40            before (int, float, or str, default=-1): 
 41                The current representation of missing values in `calldata_gt`. Common values might be -1, '.', or NaN.
 42                Default is -1.
 43            after (int, float, or str, default='.'): 
 44                The value that will replace `before`. Default is '.'.
 45        """
 46        # Save .bed file
 47        if self.__filename.suffix != '.bed':
 48            self.__filename = self.__filename.with_suffix('.bed')
 49
 50        log.info(f"Writing .bed file: {self.__filename}")
 51
 52        # Optionally rename potential missing values in `snpobj.calldata_gt` before writing
 53        if rename_missing_values:
 54            self.__snpobj.rename_missings(before=before, after=after, inplace=True)
 55
 56        # If the input matrix has three dimensions, it indicates that the data is divided into two strands.
 57        if len(self.__snpobj.calldata_gt.shape) == 3:
 58            # Sum the two strands
 59            self.__snpobj.calldata_gt = self.__snpobj.calldata_gt.transpose(1, 0, 2).sum(axis=2, dtype=np.int8)
 60
 61        # Infer the number of samples and variants from the matrix
 62        samples, variants = self.__snpobj.calldata_gt.shape
 63
 64        # Define the PgenWriter to save the data
 65        data_save = pg.PgenWriter(filename=str(self.__filename).encode('utf-8'),
 66                                  sample_ct=samples,
 67                                  variant_ct=variants,
 68                                  nonref_flags=True,
 69                                  hardcall_phase_present=False,
 70                                  dosage_present=True,
 71                                  dosage_phase_present=False)
 72
 73        # Fill the data_save object with the matrix of individuals x variants
 74        for snp_i in range(0, variants):
 75            data_save.append_biallelic(np.ascontiguousarray(self.__snpobj.calldata_gt[:, snp_i]))
 76
 77        # Save the .bed file
 78        data_save.close()
 79
 80        log.info(f"Finished writing .bed file: {self.__filename}")
 81
 82        # Remove .bed from the file name
 83        if self.__filename.suffix == '.bed':
 84            self.__filename = self.__filename.with_suffix('')
 85
 86        # Save .fam file
 87        log.info(f"Writing .fam file: {self.__filename}")
 88
 89        # Fill .fam file
 90        fam_file = pd.DataFrame(columns=['fid', 'iid', 'father', 'mother', 'gender', 'trait'])
 91        fam_file['iid'] = self.__snpobj.samples
 92        fam_file['fid'] = self.__snpobj.samples
 93
 94        # Save .fam file
 95        fam_file.to_csv(self.__filename.with_suffix('.fam'), sep='\t', index=False, header=False)
 96        log.info(f"Finished writing .fam file: {self.__filename}")
 97
 98        # Save .bim file
 99        log.info(f"Writing .bim file: {self.__filename}")
100
101        # Fill .bim file
102        bim_file = pd.DataFrame(columns=['chrom', 'snp', 'cm', 'pos', 'a0', 'a1'])
103        bim_file['chrom'] = self.__snpobj.variants_chrom
104        bim_file['snp'] = self.__snpobj.variants_id
105        bim_file['cm'] = 0  # TODO: read, save and write too if available?
106        log.warning("The .bim file is being saved with 0 cM values.")
107        bim_file['pos'] = self.__snpobj.variants_pos
108        bim_file['a0'] = self.__snpobj.variants_alt
109        bim_file['a1'] = self.__snpobj.variants_ref
110
111        # Save .bim file
112        bim_file.to_csv(self.__filename.with_suffix('.bim'), sep='\t', index=False, header=False)
113        log.info(f"Finished writing .bim file: {self.__filename}")

Writes the SNPObject to bed/bim/fam formats.

Arguments:
  • rename_missing_values (bool, optional): If True, renames potential missing values in snpobj.calldata_gt before writing. Defaults to True.
  • before (int, float, or str, default=-1): The current representation of missing values in calldata_gt. Common values might be -1, '.', or NaN. Default is -1.
  • after (int, float, or str, default='.'): The value that will replace before. Default is '.'.
class PGENWriter:
 15class PGENWriter:
 16    """
 17    Writes a genotype object in PGEN format (.pgen, .psam, and .pvar files) in the specified output path.
 18    """
 19
 20    def __init__(self, snpobj: SNPObject, filename: str):
 21        """
 22        Initializes the PGENWriter instance.
 23
 24        Args:
 25            snpobj (SNPObject): The SNPObject containing genotype data to be written.
 26            filename (str): Base path for the output files (excluding extension).
 27        """
 28        self.__snpobj = snpobj
 29        self.__filename = Path(filename)
 30
 31    def write(
 32            self, 
 33            vzs: bool = False,
 34            rename_missing_values: bool = True, 
 35            before: Union[int, float, str] = -1, 
 36            after: Union[int, float, str] = '.'
 37        ):
 38        """
 39        Writes the SNPObject data to .pgen, .psam, and .pvar files.
 40
 41        Args:
 42            vzs (bool, optional): 
 43                If True, compresses the .pvar file using zstd and saves it as .pvar.zst. Defaults to False.
 44            rename_missing_values (bool, optional):
 45                If True, renames potential missing values in `snpobj.calldata_gt` before writing. 
 46                Defaults to True.
 47            before (int, float, or str, default=-1): 
 48                The current representation of missing values in `calldata_gt`. Common values might be -1, '.', or NaN.
 49                Default is -1.
 50            after (int, float, or str, default='.'): 
 51                The value that will replace `before`. Default is '.'.
 52        """
 53        file_extensions = (".pgen", ".psam", ".pvar", ".pvar.zst")
 54        if self.__filename.suffix in file_extensions:
 55            self.__filename = self.__filename.with_suffix('')
 56
 57        # Optionally rename potential missing values in `snpobj.calldata_gt` before writing
 58        if rename_missing_values:
 59            self.__snpobj.rename_missings(before=before, after=after, inplace=True)
 60
 61        self.write_pvar(vzs=vzs)
 62        self.write_psam()
 63        self.write_pgen()
 64
 65    def write_pvar(self, vzs: bool = False):
 66        """
 67        Writes variant data to the .pvar file.
 68
 69        Args:
 70            vzs (bool, optional): If True, compresses the .pvar file using zstd and saves it as .pvar.zst. Defaults to False.
 71        """
 72        output_filename = f"{self.__filename}.pvar"
 73        if vzs:
 74            output_filename += ".zst"
 75            log.info(f"Writing to {output_filename} (compressed)")
 76        else:
 77            log.info(f"Writing to {output_filename}")
 78
 79        df = pl.DataFrame(
 80            {
 81                "#CHROM": self.__snpobj.variants_chrom,
 82                "POS": self.__snpobj.variants_pos,
 83                "ID": self.__snpobj.variants_id,
 84                "REF": self.__snpobj.variants_ref,
 85                "ALT": self.__snpobj.variants_alt,
 86                "FILTER": self.__snpobj.variants_filter_pass,
 87                # TODO: add INFO column to SNPObject and write it to the .pvar file? (if not it's lost)
 88            }
 89        )
 90        # TODO: add header to the .pvar file, if not it's lost
 91
 92        # Write the DataFrame to a CSV string
 93        csv_data = df.write_csv(None, separator="\t")
 94
 95        if vzs:
 96            # Compress the CSV data using zstd
 97            cctx = zstd.ZstdCompressor()
 98            compressed_data = cctx.compress(csv_data.encode('utf-8'))
 99            with open(output_filename, 'wb') as f:
100                f.write(compressed_data)
101        else:
102            with open(output_filename, 'w') as f:
103                f.write(csv_data)
104
105    def write_psam(self):
106        """
107        Writes sample metadata to the .psam file.
108        """
109        log.info(f"Writing {self.__filename}.psam")
110        df = pl.DataFrame(
111            {
112                "IID": self.__snpobj.samples,
113                "SEX": "NA",  # Add SEX as nan for now
114                # TODO: add SEX as Optional column to SNPObject and write it to the .psam file (if not it's lost)
115            }
116        )
117        df.write_csv(f"{self.__filename}.psam", separator="\t")
118
119    def write_pgen(self):
120        """
121        Writes the genotype data to a .pgen file.
122        """
123        log.info(f"Writing to {self.__filename}.pgen")
124        summed_strands = False if self.__snpobj.calldata_gt.ndim == 3 else True
125        if not summed_strands:
126            num_variants, num_samples, num_alleles = self.__snpobj.calldata_gt.shape
127            # Flatten the genotype matrix for pgenlib
128            flat_genotypes = self.__snpobj.calldata_gt.reshape(
129                num_variants, num_samples * num_alleles
130            )
131            with pg.PgenWriter(
132                filename=f"{self.__filename}.pgen".encode('utf-8'),
133                sample_ct=num_samples,
134                variant_ct=num_variants,
135                hardcall_phase_present=True,
136            ) as writer:
137                for variant_index in range(num_variants):
138                    writer.append_alleles(
139                        flat_genotypes[variant_index].astype(np.int32), all_phased=True
140                    )
141        else:
142            num_variants, num_samples = self.__snpobj.calldata_gt.shape
143            # Transpose to (samples, variants)
144            genotypes = self.__snpobj.calldata_gt.T  # Shape is (samples, variants)
145            with pg.PgenWriter(
146                filename=f"{self.__filename}.pgen".encode('utf-8'),
147                sample_ct=num_samples,
148                variant_ct=num_variants,
149                hardcall_phase_present=False,
150            ) as writer:
151                for variant_index in range(num_variants):
152                    variant_genotypes = genotypes[:, variant_index].astype(np.int8)
153                    # Map missing genotypes to -9 if necessary
154                    variant_genotypes[variant_genotypes == -1] = -9
155                    writer.append_biallelic(np.ascontiguousarray(variant_genotypes))

Writes a genotype object in PGEN format (.pgen, .psam, and .pvar files) in the specified output path.

PGENWriter(snpobj: snputils.snp.genobj.SNPObject, filename: str)
20    def __init__(self, snpobj: SNPObject, filename: str):
21        """
22        Initializes the PGENWriter instance.
23
24        Args:
25            snpobj (SNPObject): The SNPObject containing genotype data to be written.
26            filename (str): Base path for the output files (excluding extension).
27        """
28        self.__snpobj = snpobj
29        self.__filename = Path(filename)

Initializes the PGENWriter instance.

Arguments:
  • snpobj (SNPObject): The SNPObject containing genotype data to be written.
  • filename (str): Base path for the output files (excluding extension).
def write( self, vzs: bool = False, rename_missing_values: bool = True, before: Union[int, float, str] = -1, after: Union[int, float, str] = '.'):
31    def write(
32            self, 
33            vzs: bool = False,
34            rename_missing_values: bool = True, 
35            before: Union[int, float, str] = -1, 
36            after: Union[int, float, str] = '.'
37        ):
38        """
39        Writes the SNPObject data to .pgen, .psam, and .pvar files.
40
41        Args:
42            vzs (bool, optional): 
43                If True, compresses the .pvar file using zstd and saves it as .pvar.zst. Defaults to False.
44            rename_missing_values (bool, optional):
45                If True, renames potential missing values in `snpobj.calldata_gt` before writing. 
46                Defaults to True.
47            before (int, float, or str, default=-1): 
48                The current representation of missing values in `calldata_gt`. Common values might be -1, '.', or NaN.
49                Default is -1.
50            after (int, float, or str, default='.'): 
51                The value that will replace `before`. Default is '.'.
52        """
53        file_extensions = (".pgen", ".psam", ".pvar", ".pvar.zst")
54        if self.__filename.suffix in file_extensions:
55            self.__filename = self.__filename.with_suffix('')
56
57        # Optionally rename potential missing values in `snpobj.calldata_gt` before writing
58        if rename_missing_values:
59            self.__snpobj.rename_missings(before=before, after=after, inplace=True)
60
61        self.write_pvar(vzs=vzs)
62        self.write_psam()
63        self.write_pgen()

Writes the SNPObject data to .pgen, .psam, and .pvar files.

Arguments:
  • vzs (bool, optional): If True, compresses the .pvar file using zstd and saves it as .pvar.zst. Defaults to False.
  • rename_missing_values (bool, optional): If True, renames potential missing values in snpobj.calldata_gt before writing. Defaults to True.
  • before (int, float, or str, default=-1): The current representation of missing values in calldata_gt. Common values might be -1, '.', or NaN. Default is -1.
  • after (int, float, or str, default='.'): The value that will replace before. Default is '.'.
def write_pvar(self, vzs: bool = False):
 65    def write_pvar(self, vzs: bool = False):
 66        """
 67        Writes variant data to the .pvar file.
 68
 69        Args:
 70            vzs (bool, optional): If True, compresses the .pvar file using zstd and saves it as .pvar.zst. Defaults to False.
 71        """
 72        output_filename = f"{self.__filename}.pvar"
 73        if vzs:
 74            output_filename += ".zst"
 75            log.info(f"Writing to {output_filename} (compressed)")
 76        else:
 77            log.info(f"Writing to {output_filename}")
 78
 79        df = pl.DataFrame(
 80            {
 81                "#CHROM": self.__snpobj.variants_chrom,
 82                "POS": self.__snpobj.variants_pos,
 83                "ID": self.__snpobj.variants_id,
 84                "REF": self.__snpobj.variants_ref,
 85                "ALT": self.__snpobj.variants_alt,
 86                "FILTER": self.__snpobj.variants_filter_pass,
 87                # TODO: add INFO column to SNPObject and write it to the .pvar file? (if not it's lost)
 88            }
 89        )
 90        # TODO: add header to the .pvar file, if not it's lost
 91
 92        # Write the DataFrame to a CSV string
 93        csv_data = df.write_csv(None, separator="\t")
 94
 95        if vzs:
 96            # Compress the CSV data using zstd
 97            cctx = zstd.ZstdCompressor()
 98            compressed_data = cctx.compress(csv_data.encode('utf-8'))
 99            with open(output_filename, 'wb') as f:
100                f.write(compressed_data)
101        else:
102            with open(output_filename, 'w') as f:
103                f.write(csv_data)

Writes variant data to the .pvar file.

Arguments:
  • vzs (bool, optional): If True, compresses the .pvar file using zstd and saves it as .pvar.zst. Defaults to False.
def write_psam(self):
105    def write_psam(self):
106        """
107        Writes sample metadata to the .psam file.
108        """
109        log.info(f"Writing {self.__filename}.psam")
110        df = pl.DataFrame(
111            {
112                "IID": self.__snpobj.samples,
113                "SEX": "NA",  # Add SEX as nan for now
114                # TODO: add SEX as Optional column to SNPObject and write it to the .psam file (if not it's lost)
115            }
116        )
117        df.write_csv(f"{self.__filename}.psam", separator="\t")

Writes sample metadata to the .psam file.

def write_pgen(self):
119    def write_pgen(self):
120        """
121        Writes the genotype data to a .pgen file.
122        """
123        log.info(f"Writing to {self.__filename}.pgen")
124        summed_strands = False if self.__snpobj.calldata_gt.ndim == 3 else True
125        if not summed_strands:
126            num_variants, num_samples, num_alleles = self.__snpobj.calldata_gt.shape
127            # Flatten the genotype matrix for pgenlib
128            flat_genotypes = self.__snpobj.calldata_gt.reshape(
129                num_variants, num_samples * num_alleles
130            )
131            with pg.PgenWriter(
132                filename=f"{self.__filename}.pgen".encode('utf-8'),
133                sample_ct=num_samples,
134                variant_ct=num_variants,
135                hardcall_phase_present=True,
136            ) as writer:
137                for variant_index in range(num_variants):
138                    writer.append_alleles(
139                        flat_genotypes[variant_index].astype(np.int32), all_phased=True
140                    )
141        else:
142            num_variants, num_samples = self.__snpobj.calldata_gt.shape
143            # Transpose to (samples, variants)
144            genotypes = self.__snpobj.calldata_gt.T  # Shape is (samples, variants)
145            with pg.PgenWriter(
146                filename=f"{self.__filename}.pgen".encode('utf-8'),
147                sample_ct=num_samples,
148                variant_ct=num_variants,
149                hardcall_phase_present=False,
150            ) as writer:
151                for variant_index in range(num_variants):
152                    variant_genotypes = genotypes[:, variant_index].astype(np.int8)
153                    # Map missing genotypes to -9 if necessary
154                    variant_genotypes[variant_genotypes == -1] = -9
155                    writer.append_biallelic(np.ascontiguousarray(variant_genotypes))

Writes the genotype data to a .pgen file.

class VCFWriter:
 14class VCFWriter:
 15    """Writes a phenotype object in VCF format in the specified output path."""
 16
 17    def __init__(self, snpobj: SNPObject, filename: str, n_jobs: int = -1, phased: bool = False):
 18        """
 19        Parameters
 20        ----------
 21        snpobj : ``SNPObject`` instance
 22            A SNPObject instance.
 23        file : str
 24            Path to write VCF file.
 25        n_jobs : int, default=-1
 26            Number of jobs to run in parallel. None means 1 unless in a
 27            joblib.parallel_backend context. -1 means using all processors.
 28        phased : bool, default=True
 29            If True, the genotype data is written in the "maternal|paternal"
 30            format. If False, the genotype data is written in the
 31            "maternal/paternal" format.
 32        """
 33        self.__snpobj = snpobj
 34        self.__filename = Path(filename)
 35        self.__n_jobs = n_jobs
 36        self.__phased = phased
 37
 38    def write(
 39            self, 
 40            chrom_partition: bool = False,
 41            rename_missing_values: bool = True, 
 42            before: Union[int, float, str] = -1, 
 43            after: Union[int, float, str] = '.'
 44        ):
 45        """
 46        Writes the SNP data to VCF file(s).
 47
 48        Args:
 49            chrom_partition (bool, optional):
 50                If True, individual VCF files are generated for each chromosome.
 51                If False, a single VCF file containing data for all chromosomes is created. Defaults to False.
 52            rename_missing_values (bool, optional):
 53                If True, renames potential missing values in `snpobj.calldata_gt` before writing. 
 54                Defaults to True.
 55            before (int, float, or str, default=-1): 
 56                The current representation of missing values in `calldata_gt`. Common values might be -1, '.', or NaN.
 57                Default is -1.
 58            after (int, float, or str, default='.'): 
 59                The value that will replace `before`. Default is '.'.
 60        """
 61        self.__chrom_partition = chrom_partition
 62
 63        file_extensions = (".vcf", ".bcf")
 64        if self.__filename.suffix in file_extensions:
 65            self.__file_extension = self.__filename.suffix
 66            self.__filename = self.__filename.with_suffix('')
 67        else:
 68            self.__file_extension = ".vcf"
 69
 70        # Optionally rename potential missing values in `snpobj.calldata_gt` before writing
 71        if rename_missing_values:
 72            self.__snpobj.rename_missings(before=before, after=after, inplace=True)
 73
 74        data = self.__snpobj
 75
 76        if self.__chrom_partition:
 77            chroms = data.unique_chrom
 78
 79            for chrom in chroms:
 80                # Filter to include the data for the chromosome in particular
 81                data_chrom = data.filter_variants(chrom=chrom, inplace=False)
 82
 83                log.debug(f'Storing chromosome {chrom}')
 84                self.write_chromosome_data(chrom, data_chrom)
 85        else:
 86            self.write_chromosome_data("All", data)
 87
 88    def write_chromosome_data(self, chrom, data_chrom):
 89        """
 90        Writes the SNP data for a specific chromosome to a VCF file.
 91
 92        Args:
 93            chrom: The chromosome name.
 94            data_chrom: The SNPObject instance containing the data for the chromosome.
 95        """
 96        # Obtain npy matrix with SNPs
 97        npy = data_chrom.calldata_gt
 98        length, n_samples, num_strands = npy.shape
 99        npy = npy.reshape(length, n_samples*2).T
100
101        # Keep sample names if appropriate
102        data_samples = data_chrom.samples if len(data_chrom.samples) == n_samples else [get_name() for _ in range(n_samples)]
103
104        # Metadata
105        df = pd.DataFrame({
106            "CHROM": data_chrom.variants_chrom,
107            "POS": data_chrom.variants_pos,
108            "ID": data_chrom.variants_id,
109            "REF": data_chrom.variants_ref,
110            "ALT": data_chrom.variants_alt,
111            "QUAL": data_chrom.variants_qual,
112            "FILTER": ["PASS"] * length,
113            "INFO": ["."] * length,
114            "FORMAT": ["GT"] * length
115        })
116
117        # Genotype data for each sample "maternal|paternal"
118        column_data = joblib.Parallel(n_jobs=self.__n_jobs)(
119            joblib.delayed(process_genotype)(npy, i, length, self.__phased) for i in range(n_samples)
120        )
121
122        # Create the DataFrame once with all the columns
123        column_data = pd.DataFrame(np.array(column_data).T, columns=data_samples)
124        df = pd.concat([df, column_data], axis=1)
125
126        # Format output file
127        if chrom == "All":
128            file = self.__filename.with_suffix(self.__file_extension)
129        else:
130            file = self.__filename.parent / f"{self.__filename.stem}_{chrom}{self.__file_extension}"
131
132        # Write header
133        with open(file, "w") as f:
134            f.write("".join([
135                "##fileformat=VCFv4.1\n",
136                '##FORMAT=<ID=GT,Number=1,Type=String,Description="Phased Genotype">\n',
137                *[f"##contig=<ID={chrom}>\n" for chrom in df["CHROM"].unique()],
138                "#" + "\t".join(map(str, df.columns)) + "\n"
139            ]))
140
141        # Write genotype data
142        df.to_csv(file, sep="\t", index=False, mode="a", header=False)

Writes a phenotype object in VCF format in the specified output path.

VCFWriter( snpobj: snputils.snp.genobj.SNPObject, filename: str, n_jobs: int = -1, phased: bool = False)
17    def __init__(self, snpobj: SNPObject, filename: str, n_jobs: int = -1, phased: bool = False):
18        """
19        Parameters
20        ----------
21        snpobj : ``SNPObject`` instance
22            A SNPObject instance.
23        file : str
24            Path to write VCF file.
25        n_jobs : int, default=-1
26            Number of jobs to run in parallel. None means 1 unless in a
27            joblib.parallel_backend context. -1 means using all processors.
28        phased : bool, default=True
29            If True, the genotype data is written in the "maternal|paternal"
30            format. If False, the genotype data is written in the
31            "maternal/paternal" format.
32        """
33        self.__snpobj = snpobj
34        self.__filename = Path(filename)
35        self.__n_jobs = n_jobs
36        self.__phased = phased

Parameters

snpobj : SNPObject instance A SNPObject instance. file : str Path to write VCF file. n_jobs : int, default=-1 Number of jobs to run in parallel. None means 1 unless in a joblib.parallel_backend context. -1 means using all processors. phased : bool, default=True If True, the genotype data is written in the "maternal|paternal" format. If False, the genotype data is written in the "maternal/paternal" format.

def write( self, chrom_partition: bool = False, rename_missing_values: bool = True, before: Union[int, float, str] = -1, after: Union[int, float, str] = '.'):
38    def write(
39            self, 
40            chrom_partition: bool = False,
41            rename_missing_values: bool = True, 
42            before: Union[int, float, str] = -1, 
43            after: Union[int, float, str] = '.'
44        ):
45        """
46        Writes the SNP data to VCF file(s).
47
48        Args:
49            chrom_partition (bool, optional):
50                If True, individual VCF files are generated for each chromosome.
51                If False, a single VCF file containing data for all chromosomes is created. Defaults to False.
52            rename_missing_values (bool, optional):
53                If True, renames potential missing values in `snpobj.calldata_gt` before writing. 
54                Defaults to True.
55            before (int, float, or str, default=-1): 
56                The current representation of missing values in `calldata_gt`. Common values might be -1, '.', or NaN.
57                Default is -1.
58            after (int, float, or str, default='.'): 
59                The value that will replace `before`. Default is '.'.
60        """
61        self.__chrom_partition = chrom_partition
62
63        file_extensions = (".vcf", ".bcf")
64        if self.__filename.suffix in file_extensions:
65            self.__file_extension = self.__filename.suffix
66            self.__filename = self.__filename.with_suffix('')
67        else:
68            self.__file_extension = ".vcf"
69
70        # Optionally rename potential missing values in `snpobj.calldata_gt` before writing
71        if rename_missing_values:
72            self.__snpobj.rename_missings(before=before, after=after, inplace=True)
73
74        data = self.__snpobj
75
76        if self.__chrom_partition:
77            chroms = data.unique_chrom
78
79            for chrom in chroms:
80                # Filter to include the data for the chromosome in particular
81                data_chrom = data.filter_variants(chrom=chrom, inplace=False)
82
83                log.debug(f'Storing chromosome {chrom}')
84                self.write_chromosome_data(chrom, data_chrom)
85        else:
86            self.write_chromosome_data("All", data)

Writes the SNP data to VCF file(s).

Arguments:
  • chrom_partition (bool, optional): If True, individual VCF files are generated for each chromosome. If False, a single VCF file containing data for all chromosomes is created. Defaults to False.
  • rename_missing_values (bool, optional): If True, renames potential missing values in snpobj.calldata_gt before writing. Defaults to True.
  • before (int, float, or str, default=-1): The current representation of missing values in calldata_gt. Common values might be -1, '.', or NaN. Default is -1.
  • after (int, float, or str, default='.'): The value that will replace before. Default is '.'.
def write_chromosome_data(self, chrom, data_chrom):
 88    def write_chromosome_data(self, chrom, data_chrom):
 89        """
 90        Writes the SNP data for a specific chromosome to a VCF file.
 91
 92        Args:
 93            chrom: The chromosome name.
 94            data_chrom: The SNPObject instance containing the data for the chromosome.
 95        """
 96        # Obtain npy matrix with SNPs
 97        npy = data_chrom.calldata_gt
 98        length, n_samples, num_strands = npy.shape
 99        npy = npy.reshape(length, n_samples*2).T
100
101        # Keep sample names if appropriate
102        data_samples = data_chrom.samples if len(data_chrom.samples) == n_samples else [get_name() for _ in range(n_samples)]
103
104        # Metadata
105        df = pd.DataFrame({
106            "CHROM": data_chrom.variants_chrom,
107            "POS": data_chrom.variants_pos,
108            "ID": data_chrom.variants_id,
109            "REF": data_chrom.variants_ref,
110            "ALT": data_chrom.variants_alt,
111            "QUAL": data_chrom.variants_qual,
112            "FILTER": ["PASS"] * length,
113            "INFO": ["."] * length,
114            "FORMAT": ["GT"] * length
115        })
116
117        # Genotype data for each sample "maternal|paternal"
118        column_data = joblib.Parallel(n_jobs=self.__n_jobs)(
119            joblib.delayed(process_genotype)(npy, i, length, self.__phased) for i in range(n_samples)
120        )
121
122        # Create the DataFrame once with all the columns
123        column_data = pd.DataFrame(np.array(column_data).T, columns=data_samples)
124        df = pd.concat([df, column_data], axis=1)
125
126        # Format output file
127        if chrom == "All":
128            file = self.__filename.with_suffix(self.__file_extension)
129        else:
130            file = self.__filename.parent / f"{self.__filename.stem}_{chrom}{self.__file_extension}"
131
132        # Write header
133        with open(file, "w") as f:
134            f.write("".join([
135                "##fileformat=VCFv4.1\n",
136                '##FORMAT=<ID=GT,Number=1,Type=String,Description="Phased Genotype">\n',
137                *[f"##contig=<ID={chrom}>\n" for chrom in df["CHROM"].unique()],
138                "#" + "\t".join(map(str, df.columns)) + "\n"
139            ]))
140
141        # Write genotype data
142        df.to_csv(file, sep="\t", index=False, mode="a", header=False)

Writes the SNP data for a specific chromosome to a VCF file.

Arguments:
  • chrom: The chromosome name.
  • data_chrom: The SNPObject instance containing the data for the chromosome.