snputils.snp.io
1from .read import SNPReader, BEDReader, PGENReader, VCFReader, read_snp, read_bed, read_pgen, read_vcf 2from .write import BEDWriter, PGENWriter, VCFWriter 3 4__all__ = ['read_snp', 'read_bed', 'read_pgen', 'read_vcf', 5 'SNPReader', 'BEDReader', 'PGENReader', 'VCFReader', 6 'BEDWriter', 'PGENWriter', 'VCFWriter']
8def read_snp(filename: Union[str, pathlib.Path], **kwargs) -> SNPObject: 9 """ 10 Automatically detect the file format and read it into a SNPObject. 11 12 Args: 13 filename: Filename of the file to read. 14 **kwargs: Additional arguments passed to the reader method. 15 16 Raises: 17 ValueError: If the filename does not have an extension or the extension is not supported. 18 """ 19 from snputils.snp.io.read.auto import SNPReader 20 21 return SNPReader(filename).read(**kwargs)
Automatically detect the file format and read it into a SNPObject.
Arguments:
- filename: Filename of the file to read.
- **kwargs: Additional arguments passed to the reader method.
Raises:
- ValueError: If the filename does not have an extension or the extension is not supported.
24def read_bed(filename: Union[str, pathlib.Path], **kwargs) -> SNPObject: 25 """ 26 Read a BED fileset into a SNPObject. 27 28 Args: 29 filename: Filename of the BED fileset to read. 30 **kwargs: Additional arguments passed to the reader method. See :class:`snputils.snp.io.read.bed.BEDReader` for possible parameters. 31 """ 32 from snputils.snp.io.read.bed import BEDReader 33 34 return BEDReader(filename).read(**kwargs)
Read a BED fileset into a SNPObject.
Arguments:
- filename: Filename of the BED fileset to read.
- **kwargs: Additional arguments passed to the reader method. See
snputils.snp.io.read.bed.BEDReader
for possible parameters.
37def read_pgen(filename: Union[str, pathlib.Path], **kwargs) -> SNPObject: 38 """ 39 Read a PGEN fileset into a SNPObject. 40 41 Args: 42 filename: Filename of the PGEN fileset to read. 43 **kwargs: Additional arguments passed to the reader method. See :class:`snputils.snp.io.read.pgen.PGENReader` for possible parameters. 44 """ 45 from snputils.snp.io.read.pgen import PGENReader 46 47 return PGENReader(filename).read(**kwargs)
Read a PGEN fileset into a SNPObject.
Arguments:
- filename: Filename of the PGEN fileset to read.
- **kwargs: Additional arguments passed to the reader method. See
snputils.snp.io.read.pgen.PGENReader
for possible parameters.
50def read_vcf(filename: Union[str, pathlib.Path], 51 backend: str = 'polars', 52 **kwargs) -> SNPObject: 53 """ 54 Read a VCF fileset into a SNPObject. 55 56 Args: 57 filename: Filename of the VCF fileset to read. 58 backend: Backend to use for reading the VCF file. Options are 'polars' or 'scikit-allel'. 59 **kwargs: Additional arguments passed to the reader method. See :class:`snputils.snp.io.read.vcf.VCFReader` for possible parameters. 60 """ 61 from snputils.snp.io.read.vcf import VCFReader, VCFReaderPolars 62 if backend == 'polars': 63 print(f"Reading {filename} with polars backend") 64 return VCFReaderPolars(filename).read(**kwargs) 65 else: 66 print(f"Reading {filename} with scikit-allel backend") 67 return VCFReader(filename).read(**kwargs)
Read a VCF fileset into a SNPObject.
Arguments:
- filename: Filename of the VCF fileset to read.
- backend: Backend to use for reading the VCF file. Options are 'polars' or 'scikit-allel'.
- **kwargs: Additional arguments passed to the reader method. See
snputils.snp.io.read.vcf.VCFReader
for possible parameters.
8class SNPReader: 9 def __new__(cls, 10 filename: Union[str, pathlib.Path], 11 vcf_backend: str = 'polars') -> SNPReader: 12 """ 13 Automatically detect the SNP file format from the file extension, and return its corresponding reader. 14 15 Args: 16 filename: Filename of the file to read. 17 vcf_backend: Backend to use for reading the VCF file. Options are 'polars' or 'scikit-allel'. Default is 'polars'. 18 19 Raises: 20 ValueError: If the filename does not have an extension or the extension is not supported. 21 """ 22 filename = pathlib.Path(filename) 23 suffixes = filename.suffixes 24 if not suffixes: 25 raise ValueError("The filename should have an extension when using SNPReader.") 26 27 extension = suffixes[-2] if suffixes[-1].lower() in (".zst", ".gz") else suffixes[-1] 28 extension = extension.lower() 29 30 if extension == ".vcf": 31 if vcf_backend == 'polars': 32 from snputils.snp.io.read.vcf import VCFReaderPolars 33 34 return VCFReaderPolars(filename) 35 elif vcf_backend == 'scikit-allel': 36 from snputils.snp.io.read.vcf import VCFReader 37 38 return VCFReader(filename) 39 else: 40 raise ValueError(f"VCF backend not supported: {vcf_backend}") 41 elif extension in (".bed", ".bim", ".fam"): 42 from snputils.snp.io.read.bed import BEDReader 43 44 return BEDReader(filename) 45 elif extension in (".pgen", ".pvar", ".psam", ".pvar.zst"): 46 from snputils.snp.io.read.pgen import PGENReader 47 48 return PGENReader(filename) 49 else: 50 raise ValueError(f"File format not supported: {filename}")
9 def __new__(cls, 10 filename: Union[str, pathlib.Path], 11 vcf_backend: str = 'polars') -> SNPReader: 12 """ 13 Automatically detect the SNP file format from the file extension, and return its corresponding reader. 14 15 Args: 16 filename: Filename of the file to read. 17 vcf_backend: Backend to use for reading the VCF file. Options are 'polars' or 'scikit-allel'. Default is 'polars'. 18 19 Raises: 20 ValueError: If the filename does not have an extension or the extension is not supported. 21 """ 22 filename = pathlib.Path(filename) 23 suffixes = filename.suffixes 24 if not suffixes: 25 raise ValueError("The filename should have an extension when using SNPReader.") 26 27 extension = suffixes[-2] if suffixes[-1].lower() in (".zst", ".gz") else suffixes[-1] 28 extension = extension.lower() 29 30 if extension == ".vcf": 31 if vcf_backend == 'polars': 32 from snputils.snp.io.read.vcf import VCFReaderPolars 33 34 return VCFReaderPolars(filename) 35 elif vcf_backend == 'scikit-allel': 36 from snputils.snp.io.read.vcf import VCFReader 37 38 return VCFReader(filename) 39 else: 40 raise ValueError(f"VCF backend not supported: {vcf_backend}") 41 elif extension in (".bed", ".bim", ".fam"): 42 from snputils.snp.io.read.bed import BEDReader 43 44 return BEDReader(filename) 45 elif extension in (".pgen", ".pvar", ".psam", ".pvar.zst"): 46 from snputils.snp.io.read.pgen import PGENReader 47 48 return PGENReader(filename) 49 else: 50 raise ValueError(f"File format not supported: {filename}")
Automatically detect the SNP file format from the file extension, and return its corresponding reader.
Arguments:
- filename: Filename of the file to read.
- vcf_backend: Backend to use for reading the VCF file. Options are 'polars' or 'scikit-allel'. Default is 'polars'.
Raises:
- ValueError: If the filename does not have an extension or the extension is not supported.
16@SNPBaseReader.register 17class BEDReader(SNPBaseReader): 18 def read( 19 self, 20 fields: Optional[List[str]] = None, 21 exclude_fields: Optional[List[str]] = None, 22 sample_ids: Optional[np.ndarray] = None, 23 sample_idxs: Optional[np.ndarray] = None, 24 variant_ids: Optional[np.ndarray] = None, 25 variant_idxs: Optional[np.ndarray] = None, 26 sum_strands: bool = False, 27 separator: Optional[str] = None, 28 ) -> SNPObject: 29 """ 30 Read a bed fileset (bed, bim, fam) into a SNPObject. 31 32 Args: 33 fields (str, None, or list of str, optional): Fields to extract data for that should be included in the returned SNPObject. 34 Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS'. 35 To extract all fields, set fields to None. Defaults to None. 36 exclude_fields (str, None, or list of str, optional): Fields to exclude from the returned SNPObject. 37 Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS'. 38 To exclude no fields, set exclude_fields to None. Defaults to None. 39 sample_ids: List of sample IDs to read. If None and sample_idxs is None, all samples are read. 40 sample_idxs: List of sample indices to read. If None and sample_ids is None, all samples are read. 41 variant_ids: List of variant IDs to read. If None and variant_idxs is None, all variants are read. 42 variant_idxs: List of variant indices to read. If None and variant_ids is None, all variants are read. 43 sum_strands: If True, maternal and paternal strands are combined into a single `int8` array with values `{0, 1, 2`}. 44 If False, strands are stored separately as an `int8` array with values `{0, 1}` for each strand. 45 Note: With the pgenlib backend, `False` uses `~8×` more RAM, though `calldata_gt` is only `2×` larger. 46 separator: Separator used in the pvar file. If None, the separator is automatically detected. 47 If the automatic detection fails, please specify the separator manually. 48 49 Returns: 50 **SNPObject**: 51 A SNPObject instance. 52 """ 53 assert ( 54 sample_idxs is None or sample_ids is None 55 ), "Only one of sample_idxs and sample_ids can be specified" 56 assert ( 57 variant_idxs is None or variant_ids is None 58 ), "Only one of variant_idxs and variant_ids can be specified" 59 60 if isinstance(fields, str): 61 fields = [fields] 62 if isinstance(exclude_fields, str): 63 exclude_fields = [exclude_fields] 64 65 fields = fields or ["GT", "IID", "REF", "ALT", "#CHROM", "ID", "POS"] 66 exclude_fields = exclude_fields or [] 67 fields = [field for field in fields if field not in exclude_fields] 68 only_read_bed = fields == ["GT"] and variant_idxs is None and sample_idxs is None 69 70 filename_noext = str(self.filename) 71 if filename_noext[-4:].lower() in (".bed", ".bim", ".fam"): 72 filename_noext = filename_noext[:-4] 73 74 if only_read_bed: 75 with open(filename_noext + '.fam', 'r') as f: 76 file_num_samples = sum(1 for _ in f) # Get sample count from fam file 77 file_num_variants = None # Not needed 78 else: 79 log.info(f"Reading {filename_noext}.bim") 80 81 if separator is None: 82 with open(filename_noext + ".bim", "r") as file: 83 separator = csv.Sniffer().sniff(file.readline()).delimiter 84 85 bim = pl.read_csv( 86 filename_noext + ".bim", 87 separator=separator, 88 has_header=False, 89 new_columns=["#CHROM", "ID", "CM", "POS", "ALT", "REF"], 90 schema_overrides={ 91 "#CHROM": pl.String, 92 "ID": pl.String, 93 "CM": pl.Float64, 94 "POS": pl.Int64, 95 "ALT": pl.String, 96 "REF": pl.String 97 }, 98 null_values=["NA"] 99 ).with_row_index() 100 file_num_variants = bim.height 101 102 if variant_ids is not None: 103 variant_idxs = bim.filter(pl.col("ID").is_in(variant_ids)).select("index").to_series().to_numpy() 104 105 if variant_idxs is None: 106 num_variants = file_num_variants 107 variant_idxs = np.arange(num_variants, dtype=np.uint32) 108 else: 109 num_variants = np.size(variant_idxs) 110 variant_idxs = np.array(variant_idxs, dtype=np.uint32) 111 bim = bim.filter(pl.col("index").is_in(variant_idxs)) 112 113 log.info(f"Reading {filename_noext}.fam") 114 115 fam = pl.read_csv( 116 filename_noext + ".fam", 117 separator=separator, 118 has_header=False, 119 new_columns=["Family ID", "IID", "Father ID", 120 "Mother ID", "Sex code", "Phenotype value"], 121 schema_overrides={ 122 "Family ID": pl.String, 123 "IID": pl.String, 124 "Father ID": pl.String, 125 "Mother ID": pl.String, 126 "Sex code": pl.String, 127 }, 128 null_values=["NA"] 129 ).with_row_index() 130 file_num_samples = fam.height 131 132 if sample_ids is not None: 133 sample_idxs = fam.filter(pl.col("IID").is_in(sample_ids)).select("index").to_series().to_numpy() 134 135 if sample_idxs is None: 136 num_samples = file_num_samples 137 else: 138 num_samples = np.size(sample_idxs) 139 sample_idxs = np.array(sample_idxs, dtype=np.uint32) 140 fam = fam.filter(pl.col("index").is_in(sample_idxs)) 141 142 if "GT" in fields: 143 log.info(f"Reading {filename_noext}.bed") 144 pgen_reader = pg.PgenReader( 145 str.encode(filename_noext + ".bed"), 146 raw_sample_ct=file_num_samples, 147 variant_ct=file_num_variants, 148 sample_subset=sample_idxs, 149 ) 150 151 if only_read_bed: 152 num_samples = pgen_reader.get_raw_sample_ct() 153 num_variants = pgen_reader.get_variant_ct() 154 variant_idxs = np.arange(num_variants, dtype=np.uint32) 155 156 # required arrays: variant_idxs + sample_idxs + genotypes 157 if not sum_strands: 158 required_ram = (num_samples + num_variants + num_variants * 2 * num_samples) * 4 159 else: 160 required_ram = (num_samples + num_variants) * 4 + num_variants * num_samples 161 log.info(f">{required_ram / 1024**3:.2f} GiB of RAM are required to process {num_samples} samples with {num_variants} variants each") 162 163 if not sum_strands: 164 genotypes = np.empty((num_variants, 2 * num_samples), dtype=np.int32) # cannot use int8 because of pgenlib 165 pgen_reader.read_alleles_list(variant_idxs, genotypes) 166 genotypes = genotypes.astype(np.int8).reshape((num_variants, num_samples, 2)) 167 else: 168 genotypes = np.empty((num_variants, num_samples), dtype=np.int8) 169 pgen_reader.read_list(variant_idxs, genotypes) 170 pgen_reader.close() 171 else: 172 genotypes = None 173 174 log.info("Constructing SNPObject") 175 176 snpobj = SNPObject( 177 calldata_gt=genotypes if "GT" in fields else None, 178 samples=fam.get_column("IID").to_numpy() if "IID" in fields and "IID" in fam.columns else None, 179 **{f'variants_{k.lower()}': bim.get_column(v).to_numpy() if v in fields and v in bim.columns else None 180 for k, v in {'ref': 'REF', 'alt': 'ALT', 'chrom': '#CHROM', 'id': 'ID', 'pos': 'POS'}.items()} 181 ) 182 183 log.info("Finished constructing SNPObject") 184 return snpobj
Abstract class for SNP readers.
Attributes:
- _filename: The path to the file storing SNP data.
18 def read( 19 self, 20 fields: Optional[List[str]] = None, 21 exclude_fields: Optional[List[str]] = None, 22 sample_ids: Optional[np.ndarray] = None, 23 sample_idxs: Optional[np.ndarray] = None, 24 variant_ids: Optional[np.ndarray] = None, 25 variant_idxs: Optional[np.ndarray] = None, 26 sum_strands: bool = False, 27 separator: Optional[str] = None, 28 ) -> SNPObject: 29 """ 30 Read a bed fileset (bed, bim, fam) into a SNPObject. 31 32 Args: 33 fields (str, None, or list of str, optional): Fields to extract data for that should be included in the returned SNPObject. 34 Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS'. 35 To extract all fields, set fields to None. Defaults to None. 36 exclude_fields (str, None, or list of str, optional): Fields to exclude from the returned SNPObject. 37 Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS'. 38 To exclude no fields, set exclude_fields to None. Defaults to None. 39 sample_ids: List of sample IDs to read. If None and sample_idxs is None, all samples are read. 40 sample_idxs: List of sample indices to read. If None and sample_ids is None, all samples are read. 41 variant_ids: List of variant IDs to read. If None and variant_idxs is None, all variants are read. 42 variant_idxs: List of variant indices to read. If None and variant_ids is None, all variants are read. 43 sum_strands: If True, maternal and paternal strands are combined into a single `int8` array with values `{0, 1, 2`}. 44 If False, strands are stored separately as an `int8` array with values `{0, 1}` for each strand. 45 Note: With the pgenlib backend, `False` uses `~8×` more RAM, though `calldata_gt` is only `2×` larger. 46 separator: Separator used in the pvar file. If None, the separator is automatically detected. 47 If the automatic detection fails, please specify the separator manually. 48 49 Returns: 50 **SNPObject**: 51 A SNPObject instance. 52 """ 53 assert ( 54 sample_idxs is None or sample_ids is None 55 ), "Only one of sample_idxs and sample_ids can be specified" 56 assert ( 57 variant_idxs is None or variant_ids is None 58 ), "Only one of variant_idxs and variant_ids can be specified" 59 60 if isinstance(fields, str): 61 fields = [fields] 62 if isinstance(exclude_fields, str): 63 exclude_fields = [exclude_fields] 64 65 fields = fields or ["GT", "IID", "REF", "ALT", "#CHROM", "ID", "POS"] 66 exclude_fields = exclude_fields or [] 67 fields = [field for field in fields if field not in exclude_fields] 68 only_read_bed = fields == ["GT"] and variant_idxs is None and sample_idxs is None 69 70 filename_noext = str(self.filename) 71 if filename_noext[-4:].lower() in (".bed", ".bim", ".fam"): 72 filename_noext = filename_noext[:-4] 73 74 if only_read_bed: 75 with open(filename_noext + '.fam', 'r') as f: 76 file_num_samples = sum(1 for _ in f) # Get sample count from fam file 77 file_num_variants = None # Not needed 78 else: 79 log.info(f"Reading {filename_noext}.bim") 80 81 if separator is None: 82 with open(filename_noext + ".bim", "r") as file: 83 separator = csv.Sniffer().sniff(file.readline()).delimiter 84 85 bim = pl.read_csv( 86 filename_noext + ".bim", 87 separator=separator, 88 has_header=False, 89 new_columns=["#CHROM", "ID", "CM", "POS", "ALT", "REF"], 90 schema_overrides={ 91 "#CHROM": pl.String, 92 "ID": pl.String, 93 "CM": pl.Float64, 94 "POS": pl.Int64, 95 "ALT": pl.String, 96 "REF": pl.String 97 }, 98 null_values=["NA"] 99 ).with_row_index() 100 file_num_variants = bim.height 101 102 if variant_ids is not None: 103 variant_idxs = bim.filter(pl.col("ID").is_in(variant_ids)).select("index").to_series().to_numpy() 104 105 if variant_idxs is None: 106 num_variants = file_num_variants 107 variant_idxs = np.arange(num_variants, dtype=np.uint32) 108 else: 109 num_variants = np.size(variant_idxs) 110 variant_idxs = np.array(variant_idxs, dtype=np.uint32) 111 bim = bim.filter(pl.col("index").is_in(variant_idxs)) 112 113 log.info(f"Reading {filename_noext}.fam") 114 115 fam = pl.read_csv( 116 filename_noext + ".fam", 117 separator=separator, 118 has_header=False, 119 new_columns=["Family ID", "IID", "Father ID", 120 "Mother ID", "Sex code", "Phenotype value"], 121 schema_overrides={ 122 "Family ID": pl.String, 123 "IID": pl.String, 124 "Father ID": pl.String, 125 "Mother ID": pl.String, 126 "Sex code": pl.String, 127 }, 128 null_values=["NA"] 129 ).with_row_index() 130 file_num_samples = fam.height 131 132 if sample_ids is not None: 133 sample_idxs = fam.filter(pl.col("IID").is_in(sample_ids)).select("index").to_series().to_numpy() 134 135 if sample_idxs is None: 136 num_samples = file_num_samples 137 else: 138 num_samples = np.size(sample_idxs) 139 sample_idxs = np.array(sample_idxs, dtype=np.uint32) 140 fam = fam.filter(pl.col("index").is_in(sample_idxs)) 141 142 if "GT" in fields: 143 log.info(f"Reading {filename_noext}.bed") 144 pgen_reader = pg.PgenReader( 145 str.encode(filename_noext + ".bed"), 146 raw_sample_ct=file_num_samples, 147 variant_ct=file_num_variants, 148 sample_subset=sample_idxs, 149 ) 150 151 if only_read_bed: 152 num_samples = pgen_reader.get_raw_sample_ct() 153 num_variants = pgen_reader.get_variant_ct() 154 variant_idxs = np.arange(num_variants, dtype=np.uint32) 155 156 # required arrays: variant_idxs + sample_idxs + genotypes 157 if not sum_strands: 158 required_ram = (num_samples + num_variants + num_variants * 2 * num_samples) * 4 159 else: 160 required_ram = (num_samples + num_variants) * 4 + num_variants * num_samples 161 log.info(f">{required_ram / 1024**3:.2f} GiB of RAM are required to process {num_samples} samples with {num_variants} variants each") 162 163 if not sum_strands: 164 genotypes = np.empty((num_variants, 2 * num_samples), dtype=np.int32) # cannot use int8 because of pgenlib 165 pgen_reader.read_alleles_list(variant_idxs, genotypes) 166 genotypes = genotypes.astype(np.int8).reshape((num_variants, num_samples, 2)) 167 else: 168 genotypes = np.empty((num_variants, num_samples), dtype=np.int8) 169 pgen_reader.read_list(variant_idxs, genotypes) 170 pgen_reader.close() 171 else: 172 genotypes = None 173 174 log.info("Constructing SNPObject") 175 176 snpobj = SNPObject( 177 calldata_gt=genotypes if "GT" in fields else None, 178 samples=fam.get_column("IID").to_numpy() if "IID" in fields and "IID" in fam.columns else None, 179 **{f'variants_{k.lower()}': bim.get_column(v).to_numpy() if v in fields and v in bim.columns else None 180 for k, v in {'ref': 'REF', 'alt': 'ALT', 'chrom': '#CHROM', 'id': 'ID', 'pos': 'POS'}.items()} 181 ) 182 183 log.info("Finished constructing SNPObject") 184 return snpobj
Read a bed fileset (bed, bim, fam) into a SNPObject.
Arguments:
- fields (str, None, or list of str, optional): Fields to extract data for that should be included in the returned SNPObject. Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS'. To extract all fields, set fields to None. Defaults to None.
- exclude_fields (str, None, or list of str, optional): Fields to exclude from the returned SNPObject. Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS'. To exclude no fields, set exclude_fields to None. Defaults to None.
- sample_ids: List of sample IDs to read. If None and sample_idxs is None, all samples are read.
- sample_idxs: List of sample indices to read. If None and sample_ids is None, all samples are read.
- variant_ids: List of variant IDs to read. If None and variant_idxs is None, all variants are read.
- variant_idxs: List of variant indices to read. If None and variant_ids is None, all variants are read.
- sum_strands: If True, maternal and paternal strands are combined into a single
int8
array with values{0, 1, 2
}. If False, strands are stored separately as anint8
array with values{0, 1}
for each strand. Note: With the pgenlib backend,False
uses~8×
more RAM, thoughcalldata_gt
is only2×
larger. - separator: Separator used in the pvar file. If None, the separator is automatically detected. If the automatic detection fails, please specify the separator manually.
Returns:
SNPObject: A SNPObject instance.
17@SNPBaseReader.register 18class PGENReader(SNPBaseReader): 19 def read( 20 self, 21 fields: Optional[List[str]] = None, 22 exclude_fields: Optional[List[str]] = None, 23 sample_ids: Optional[np.ndarray] = None, 24 sample_idxs: Optional[np.ndarray] = None, 25 variant_ids: Optional[np.ndarray] = None, 26 variant_idxs: Optional[np.ndarray] = None, 27 sum_strands: bool = False, 28 separator: str = None, 29 ) -> SNPObject: 30 """ 31 Read a pgen fileset (pgen, psam, pvar) into a SNPObject. 32 33 Args: 34 fields (str, None, or list of str, optional): Fields to extract data for that should be included in the returned SNPObject. 35 Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS', 'FILTER', 'QUAL'. 36 To extract all fields, set fields to None. Defaults to None. 37 exclude_fields (str, None, or list of str, optional): Fields to exclude from the returned SNPObject. 38 Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS', 'FILTER', 'QUAL'. 39 To exclude no fields, set exclude_fields to None. Defaults to None. 40 sample_ids: List of sample IDs to read. If None and sample_idxs is None, all samples are read. 41 sample_idxs: List of sample indices to read. If None and sample_ids is None, all samples are read. 42 variant_ids: List of variant IDs to read. If None and variant_idxs is None, all variants are read. 43 variant_idxs: List of variant indices to read. If None and variant_ids is None, all variants are read. 44 sum_strands: If True, maternal and paternal strands are combined into a single `int8` array with values `{0, 1, 2`}. 45 If False, strands are stored separately as an `int8` array with values `{0, 1}` for each strand. 46 Note: With the pgenlib backend, `False` uses `~8×` more RAM, though `calldata_gt` is only `2×` larger. 47 separator: Separator used in the pvar file. If None, the separator is automatically detected. 48 If the automatic detection fails, please specify the separator manually. 49 50 Returns: 51 **SNPObject**: 52 A SNPObject instance. 53 """ 54 assert ( 55 sample_idxs is None or sample_ids is None 56 ), "Only one of sample_idxs and sample_ids can be specified" 57 assert ( 58 variant_idxs is None or variant_ids is None 59 ), "Only one of variant_idxs and variant_ids can be specified" 60 61 if isinstance(fields, str): 62 fields = [fields] 63 if isinstance(exclude_fields, str): 64 exclude_fields = [exclude_fields] 65 66 fields = fields or ["GT", "IID", "REF", "ALT", "#CHROM", "ID", "POS", "FILTER", "QUAL"] 67 exclude_fields = exclude_fields or [] 68 fields = [field for field in fields if field not in exclude_fields] 69 only_read_pgen = fields == ["GT"] and variant_idxs is None and sample_idxs is None 70 71 filename_noext = str(self.filename) 72 for ext in [".pgen", ".pvar", ".pvar.zst", ".psam"]: 73 if filename_noext.endswith(ext): 74 filename_noext = filename_noext[:-len(ext)] 75 break 76 77 if only_read_pgen: 78 file_num_samples = None # Not needed for pgen 79 file_num_variants = None # Not needed 80 else: 81 pvar_extensions = [".pvar", ".pvar.zst"] 82 pvar_filename = None 83 for ext in pvar_extensions: 84 possible_pvar = filename_noext + ext 85 if os.path.exists(possible_pvar): 86 pvar_filename = possible_pvar 87 break 88 if pvar_filename is None: 89 raise FileNotFoundError(f"No .pvar or .pvar.zst file found for {filename_noext}") 90 91 log.info(f"Reading {pvar_filename}") 92 93 def open_textfile(filename): 94 if filename.endswith('.zst'): 95 import zstandard as zstd 96 return zstd.open(filename, 'rt') 97 else: 98 return open(filename, 'rt') 99 100 pvar_has_header = True 101 pvar_header_line_num = 0 102 with open_textfile(pvar_filename) as file: 103 for line_num, line in enumerate(file): 104 if line.startswith("##"): # Metadata 105 continue 106 else: 107 if separator is None: 108 separator = csv.Sniffer().sniff(file.readline()).delimiter 109 if line.startswith("#CHROM"): # Header 110 pvar_header_line_num = line_num 111 header = line.strip().split() 112 break 113 elif not line.startswith("#"): # If no header, look at line 1 114 pvar_has_header = False 115 cols_in_pvar = len(line.strip().split(separator)) 116 if cols_in_pvar == 5: 117 header = ["#CHROM", "ID", "POS", "ALT", "REF"] 118 elif cols_in_pvar == 6: 119 header = ["#CHROM", "ID", "CM", "POS", "ALT", "REF"] 120 else: 121 raise ValueError( 122 f"{pvar_filename} is not a valid pvar file." 123 ) 124 break 125 126 pvar_reading_args = { 127 'separator': separator, 128 'skip_rows': pvar_header_line_num, 129 'has_header': pvar_has_header, 130 'new_columns': None if pvar_has_header else header, 131 'schema_overrides': { 132 "#CHROM": pl.String, 133 "POS": pl.UInt32, 134 "ID": pl.String, 135 "REF": pl.String, 136 "ALT": pl.String, 137 }, 138 'null_values': ["NA"], 139 } 140 if pvar_filename.endswith('.zst'): 141 pvar = pl.read_csv(pvar_filename, **pvar_reading_args).lazy() 142 else: 143 pvar = pl.scan_csv(pvar_filename, **pvar_reading_args) 144 145 # We need to track row positions of variants but doing this with lazy operations would 146 # defeat predicate pushdown optimization and force loading the entire DataFrame. 147 # Solution: We only materialize the ID column with indices, keeping memory usage minimal 148 # while maintaining the ability to map between variant IDs and their file positions. 149 variant_id_idxs = pvar.select("ID").with_row_index().collect() 150 151 file_num_variants = variant_id_idxs.height 152 153 if variant_ids is not None: 154 num_variants = np.size(variant_ids) 155 pvar = pvar.filter(pl.col("ID").is_in(variant_ids)).collect() 156 variant_idxs = variant_id_idxs.filter(pl.col("ID").is_in(variant_ids)).select("index").to_series().to_numpy() 157 elif variant_idxs is not None: 158 num_variants = np.size(variant_idxs) 159 variant_idxs = np.array(variant_idxs, dtype=np.uint32) 160 variant_ids = variant_id_idxs.filter(pl.col("index").is_in(variant_idxs)).select("ID") 161 pvar = pvar.filter(pl.col("ID").is_in(variant_ids)).collect() 162 else: 163 num_variants = file_num_variants 164 variant_idxs = np.arange(num_variants, dtype=np.uint32) 165 pvar = pvar.collect() 166 167 log.info(f"Reading {filename_noext}.psam") 168 169 with open(filename_noext + ".psam") as file: 170 first_line = file.readline().strip() 171 psam_has_header = first_line.startswith(("#FID", "FID", "#IID", "IID")) 172 173 psam = pl.read_csv( 174 filename_noext + ".psam", 175 separator=separator, 176 has_header=psam_has_header, 177 new_columns=None if psam_has_header else ["FID", "IID", "PAT", "MAT", "SEX", "PHENO1"], 178 null_values=["NA"], 179 ).with_row_index() 180 if "#IID" in psam.columns: 181 psam = psam.rename({"#IID": "IID"}) 182 if "#FID" in psam.columns: 183 psam = psam.rename({"#FID": "FID"}) 184 185 file_num_samples = psam.height 186 187 if sample_ids is not None: 188 num_samples = np.size(sample_ids) 189 psam = psam.filter(pl.col("IID").is_in(sample_ids)) 190 sample_idxs = psam.select("index").to_series().to_numpy() 191 elif sample_idxs is not None: 192 num_samples = np.size(sample_idxs) 193 sample_idxs = np.array(sample_idxs, dtype=np.uint32) 194 psam = psam.filter(pl.col("index").is_in(sample_idxs)) 195 else: 196 num_samples = file_num_samples 197 198 if "GT" in fields: 199 log.info(f"Reading {filename_noext}.pgen") 200 pgen_reader = pg.PgenReader( 201 str.encode(filename_noext + ".pgen"), 202 raw_sample_ct=file_num_samples, 203 variant_ct=file_num_variants, 204 sample_subset=sample_idxs, 205 ) 206 207 if only_read_pgen: 208 num_samples = pgen_reader.get_raw_sample_ct() 209 num_variants = pgen_reader.get_variant_ct() 210 variant_idxs = np.arange(num_variants, dtype=np.uint32) 211 212 # required arrays: variant_idxs + sample_idxs + genotypes 213 if not sum_strands: 214 required_ram = (num_samples + num_variants + num_variants * 2 * num_samples) * 4 215 else: 216 required_ram = (num_samples + num_variants) * 4 + num_variants * num_samples 217 log.info(f">{required_ram / 1024**3:.2f} GiB of RAM are required to process {num_samples} samples with {num_variants} variants each") 218 219 if not sum_strands: 220 genotypes = np.empty((num_variants, 2 * num_samples), dtype=np.int32) # cannot use int8 because of pgenlib 221 pgen_reader.read_alleles_list(variant_idxs, genotypes) 222 genotypes = genotypes.astype(np.int8).reshape((num_variants, num_samples, 2)) 223 else: 224 genotypes = np.empty((num_variants, num_samples), dtype=np.int8) 225 pgen_reader.read_list(variant_idxs, genotypes) 226 pgen_reader.close() 227 else: 228 genotypes = None 229 230 log.info("Constructing SNPObject") 231 232 snpobj = SNPObject( 233 calldata_gt=genotypes if "GT" in fields else None, 234 samples=psam.get_column("IID").to_numpy() if "IID" in fields and "IID" in psam.columns else None, 235 **{f'variants_{k.lower()}': pvar.get_column(v).to_numpy() if v in fields and v in pvar.columns else None 236 for k, v in {'ref': 'REF', 'alt': 'ALT', 'chrom': '#CHROM', 'id': 'ID', 'pos': 'POS', 'filter_pass': 'FILTER', 'qual': 'QUAL'}.items()} 237 ) 238 239 log.info("Finished constructing SNPObject") 240 return snpobj
Abstract class for SNP readers.
Attributes:
- _filename: The path to the file storing SNP data.
19 def read( 20 self, 21 fields: Optional[List[str]] = None, 22 exclude_fields: Optional[List[str]] = None, 23 sample_ids: Optional[np.ndarray] = None, 24 sample_idxs: Optional[np.ndarray] = None, 25 variant_ids: Optional[np.ndarray] = None, 26 variant_idxs: Optional[np.ndarray] = None, 27 sum_strands: bool = False, 28 separator: str = None, 29 ) -> SNPObject: 30 """ 31 Read a pgen fileset (pgen, psam, pvar) into a SNPObject. 32 33 Args: 34 fields (str, None, or list of str, optional): Fields to extract data for that should be included in the returned SNPObject. 35 Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS', 'FILTER', 'QUAL'. 36 To extract all fields, set fields to None. Defaults to None. 37 exclude_fields (str, None, or list of str, optional): Fields to exclude from the returned SNPObject. 38 Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS', 'FILTER', 'QUAL'. 39 To exclude no fields, set exclude_fields to None. Defaults to None. 40 sample_ids: List of sample IDs to read. If None and sample_idxs is None, all samples are read. 41 sample_idxs: List of sample indices to read. If None and sample_ids is None, all samples are read. 42 variant_ids: List of variant IDs to read. If None and variant_idxs is None, all variants are read. 43 variant_idxs: List of variant indices to read. If None and variant_ids is None, all variants are read. 44 sum_strands: If True, maternal and paternal strands are combined into a single `int8` array with values `{0, 1, 2`}. 45 If False, strands are stored separately as an `int8` array with values `{0, 1}` for each strand. 46 Note: With the pgenlib backend, `False` uses `~8×` more RAM, though `calldata_gt` is only `2×` larger. 47 separator: Separator used in the pvar file. If None, the separator is automatically detected. 48 If the automatic detection fails, please specify the separator manually. 49 50 Returns: 51 **SNPObject**: 52 A SNPObject instance. 53 """ 54 assert ( 55 sample_idxs is None or sample_ids is None 56 ), "Only one of sample_idxs and sample_ids can be specified" 57 assert ( 58 variant_idxs is None or variant_ids is None 59 ), "Only one of variant_idxs and variant_ids can be specified" 60 61 if isinstance(fields, str): 62 fields = [fields] 63 if isinstance(exclude_fields, str): 64 exclude_fields = [exclude_fields] 65 66 fields = fields or ["GT", "IID", "REF", "ALT", "#CHROM", "ID", "POS", "FILTER", "QUAL"] 67 exclude_fields = exclude_fields or [] 68 fields = [field for field in fields if field not in exclude_fields] 69 only_read_pgen = fields == ["GT"] and variant_idxs is None and sample_idxs is None 70 71 filename_noext = str(self.filename) 72 for ext in [".pgen", ".pvar", ".pvar.zst", ".psam"]: 73 if filename_noext.endswith(ext): 74 filename_noext = filename_noext[:-len(ext)] 75 break 76 77 if only_read_pgen: 78 file_num_samples = None # Not needed for pgen 79 file_num_variants = None # Not needed 80 else: 81 pvar_extensions = [".pvar", ".pvar.zst"] 82 pvar_filename = None 83 for ext in pvar_extensions: 84 possible_pvar = filename_noext + ext 85 if os.path.exists(possible_pvar): 86 pvar_filename = possible_pvar 87 break 88 if pvar_filename is None: 89 raise FileNotFoundError(f"No .pvar or .pvar.zst file found for {filename_noext}") 90 91 log.info(f"Reading {pvar_filename}") 92 93 def open_textfile(filename): 94 if filename.endswith('.zst'): 95 import zstandard as zstd 96 return zstd.open(filename, 'rt') 97 else: 98 return open(filename, 'rt') 99 100 pvar_has_header = True 101 pvar_header_line_num = 0 102 with open_textfile(pvar_filename) as file: 103 for line_num, line in enumerate(file): 104 if line.startswith("##"): # Metadata 105 continue 106 else: 107 if separator is None: 108 separator = csv.Sniffer().sniff(file.readline()).delimiter 109 if line.startswith("#CHROM"): # Header 110 pvar_header_line_num = line_num 111 header = line.strip().split() 112 break 113 elif not line.startswith("#"): # If no header, look at line 1 114 pvar_has_header = False 115 cols_in_pvar = len(line.strip().split(separator)) 116 if cols_in_pvar == 5: 117 header = ["#CHROM", "ID", "POS", "ALT", "REF"] 118 elif cols_in_pvar == 6: 119 header = ["#CHROM", "ID", "CM", "POS", "ALT", "REF"] 120 else: 121 raise ValueError( 122 f"{pvar_filename} is not a valid pvar file." 123 ) 124 break 125 126 pvar_reading_args = { 127 'separator': separator, 128 'skip_rows': pvar_header_line_num, 129 'has_header': pvar_has_header, 130 'new_columns': None if pvar_has_header else header, 131 'schema_overrides': { 132 "#CHROM": pl.String, 133 "POS": pl.UInt32, 134 "ID": pl.String, 135 "REF": pl.String, 136 "ALT": pl.String, 137 }, 138 'null_values': ["NA"], 139 } 140 if pvar_filename.endswith('.zst'): 141 pvar = pl.read_csv(pvar_filename, **pvar_reading_args).lazy() 142 else: 143 pvar = pl.scan_csv(pvar_filename, **pvar_reading_args) 144 145 # We need to track row positions of variants but doing this with lazy operations would 146 # defeat predicate pushdown optimization and force loading the entire DataFrame. 147 # Solution: We only materialize the ID column with indices, keeping memory usage minimal 148 # while maintaining the ability to map between variant IDs and their file positions. 149 variant_id_idxs = pvar.select("ID").with_row_index().collect() 150 151 file_num_variants = variant_id_idxs.height 152 153 if variant_ids is not None: 154 num_variants = np.size(variant_ids) 155 pvar = pvar.filter(pl.col("ID").is_in(variant_ids)).collect() 156 variant_idxs = variant_id_idxs.filter(pl.col("ID").is_in(variant_ids)).select("index").to_series().to_numpy() 157 elif variant_idxs is not None: 158 num_variants = np.size(variant_idxs) 159 variant_idxs = np.array(variant_idxs, dtype=np.uint32) 160 variant_ids = variant_id_idxs.filter(pl.col("index").is_in(variant_idxs)).select("ID") 161 pvar = pvar.filter(pl.col("ID").is_in(variant_ids)).collect() 162 else: 163 num_variants = file_num_variants 164 variant_idxs = np.arange(num_variants, dtype=np.uint32) 165 pvar = pvar.collect() 166 167 log.info(f"Reading {filename_noext}.psam") 168 169 with open(filename_noext + ".psam") as file: 170 first_line = file.readline().strip() 171 psam_has_header = first_line.startswith(("#FID", "FID", "#IID", "IID")) 172 173 psam = pl.read_csv( 174 filename_noext + ".psam", 175 separator=separator, 176 has_header=psam_has_header, 177 new_columns=None if psam_has_header else ["FID", "IID", "PAT", "MAT", "SEX", "PHENO1"], 178 null_values=["NA"], 179 ).with_row_index() 180 if "#IID" in psam.columns: 181 psam = psam.rename({"#IID": "IID"}) 182 if "#FID" in psam.columns: 183 psam = psam.rename({"#FID": "FID"}) 184 185 file_num_samples = psam.height 186 187 if sample_ids is not None: 188 num_samples = np.size(sample_ids) 189 psam = psam.filter(pl.col("IID").is_in(sample_ids)) 190 sample_idxs = psam.select("index").to_series().to_numpy() 191 elif sample_idxs is not None: 192 num_samples = np.size(sample_idxs) 193 sample_idxs = np.array(sample_idxs, dtype=np.uint32) 194 psam = psam.filter(pl.col("index").is_in(sample_idxs)) 195 else: 196 num_samples = file_num_samples 197 198 if "GT" in fields: 199 log.info(f"Reading {filename_noext}.pgen") 200 pgen_reader = pg.PgenReader( 201 str.encode(filename_noext + ".pgen"), 202 raw_sample_ct=file_num_samples, 203 variant_ct=file_num_variants, 204 sample_subset=sample_idxs, 205 ) 206 207 if only_read_pgen: 208 num_samples = pgen_reader.get_raw_sample_ct() 209 num_variants = pgen_reader.get_variant_ct() 210 variant_idxs = np.arange(num_variants, dtype=np.uint32) 211 212 # required arrays: variant_idxs + sample_idxs + genotypes 213 if not sum_strands: 214 required_ram = (num_samples + num_variants + num_variants * 2 * num_samples) * 4 215 else: 216 required_ram = (num_samples + num_variants) * 4 + num_variants * num_samples 217 log.info(f">{required_ram / 1024**3:.2f} GiB of RAM are required to process {num_samples} samples with {num_variants} variants each") 218 219 if not sum_strands: 220 genotypes = np.empty((num_variants, 2 * num_samples), dtype=np.int32) # cannot use int8 because of pgenlib 221 pgen_reader.read_alleles_list(variant_idxs, genotypes) 222 genotypes = genotypes.astype(np.int8).reshape((num_variants, num_samples, 2)) 223 else: 224 genotypes = np.empty((num_variants, num_samples), dtype=np.int8) 225 pgen_reader.read_list(variant_idxs, genotypes) 226 pgen_reader.close() 227 else: 228 genotypes = None 229 230 log.info("Constructing SNPObject") 231 232 snpobj = SNPObject( 233 calldata_gt=genotypes if "GT" in fields else None, 234 samples=psam.get_column("IID").to_numpy() if "IID" in fields and "IID" in psam.columns else None, 235 **{f'variants_{k.lower()}': pvar.get_column(v).to_numpy() if v in fields and v in pvar.columns else None 236 for k, v in {'ref': 'REF', 'alt': 'ALT', 'chrom': '#CHROM', 'id': 'ID', 'pos': 'POS', 'filter_pass': 'FILTER', 'qual': 'QUAL'}.items()} 237 ) 238 239 log.info("Finished constructing SNPObject") 240 return snpobj
Read a pgen fileset (pgen, psam, pvar) into a SNPObject.
Arguments:
- fields (str, None, or list of str, optional): Fields to extract data for that should be included in the returned SNPObject. Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS', 'FILTER', 'QUAL'. To extract all fields, set fields to None. Defaults to None.
- exclude_fields (str, None, or list of str, optional): Fields to exclude from the returned SNPObject. Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS', 'FILTER', 'QUAL'. To exclude no fields, set exclude_fields to None. Defaults to None.
- sample_ids: List of sample IDs to read. If None and sample_idxs is None, all samples are read.
- sample_idxs: List of sample indices to read. If None and sample_ids is None, all samples are read.
- variant_ids: List of variant IDs to read. If None and variant_idxs is None, all variants are read.
- variant_idxs: List of variant indices to read. If None and variant_ids is None, all variants are read.
- sum_strands: If True, maternal and paternal strands are combined into a single
int8
array with values{0, 1, 2
}. If False, strands are stored separately as anint8
array with values{0, 1}
for each strand. Note: With the pgenlib backend,False
uses~8×
more RAM, thoughcalldata_gt
is only2×
larger. - separator: Separator used in the pvar file. If None, the separator is automatically detected. If the automatic detection fails, please specify the separator manually.
Returns:
SNPObject: A SNPObject instance.
18@SNPBaseReader.register 19class VCFReader(SNPBaseReader): 20 def read( 21 self, 22 fields: Optional[List[str]] = None, 23 exclude_fields: Optional[List[str]] = None, 24 rename_fields: Optional[dict] = None, 25 fills: Optional[dict] = None, 26 region: Optional[str] = None, 27 samples: Optional[List[str]] = None, 28 sum_strands: bool = False, 29 ) -> SNPObject: 30 """ 31 Read a vcf file into a SNPObject. 32 33 Args: 34 fields: Fields to extract data for. e.g., ['variants/CHROM', 'variants/POS', 35 'calldata/GT']. If you are feeling lazy, you can drop the 'variants/' 36 and 'calldata/' prefixes, in which case the fields will be matched 37 against fields declared in the VCF header, with variants taking priority 38 over calldata if a field with the same ID exists both in INFO and FORMAT 39 headers. I.e., ['CHROM', 'POS', 'DP', 'GT'] will work, although watch out 40 for fields like 'DP' which can be both INFO and FORMAT. To extract all 41 fields, provide just the string '*'. To extract all variants fields 42 (including all INFO fields) provide 'variants/*'. To extract all 43 calldata fields (i.e., defined in FORMAT headers) provide 'calldata/*'. 44 exclude_fields: Fields to exclude. E.g., for use in combination with fields='*'. 45 rename_fields: Fields to be renamed. Should be a dictionary mapping old to new names. 46 fills: Override the fill value used for empty values. Should be a dictionary 47 mapping field names to fill values. 48 region: Genomic region to extract variants for. If provided, should be a 49 tabix-style region string, which can be either just a chromosome name 50 (e.g., '2L'), or a chromosome name followed by 1-based beginning and 51 end coordinates (e.g., '2L:100000-200000'). Note that only variants 52 whose start position (POS) is within the requested range will be included. 53 This is slightly different from the default tabix behaviour, where a 54 variant (e.g., deletion) may be included if its position (POS) occurs 55 before the requested region but its reference allele overlaps the 56 region - such a variant will not be included in the data returned 57 by this function. 58 samples: Selection of samples to extract calldata for. If provided, should be 59 a list of strings giving sample identifiers. May also be a list of 60 integers giving indices of selected samples. 61 sum_strands: If True, maternal and paternal strands are combined into a single `int8` array with values `{0, 1, 2`}. 62 If False, strands are stored separately as an `int8` array with values `{0, 1}` for each strand. 63 64 Returns: 65 **SNPObject**: 66 A SNPObject instance. 67 """ 68 log.info(f"Reading {self.filename}") 69 70 vcf_dict = allel.read_vcf( 71 str(self.filename), 72 fields=fields, 73 exclude_fields=exclude_fields, 74 rename_fields=rename_fields, 75 fills=fills, 76 region=region, 77 samples=samples, 78 alt_number=1, 79 ) 80 assert vcf_dict is not None # suppress Flake8 warning 81 82 genotypes = vcf_dict["calldata/GT"].astype(np.int8) 83 if sum_strands: 84 genotypes = genotypes.sum(axis=2, dtype=np.int8) 85 86 snpobj = SNPObject( 87 calldata_gt=genotypes, 88 samples=vcf_dict["samples"], 89 variants_ref=vcf_dict["variants/REF"], 90 variants_alt=vcf_dict["variants/ALT"], 91 variants_chrom=vcf_dict["variants/CHROM"], 92 variants_filter_pass=vcf_dict["variants/FILTER_PASS"], 93 variants_id=vcf_dict["variants/ID"], 94 variants_pos=vcf_dict["variants/POS"], 95 variants_qual=vcf_dict["variants/QUAL"], 96 ) 97 98 log.info(f"Finished reading {self.filename}") 99 return snpobj
Abstract class for SNP readers.
Attributes:
- _filename: The path to the file storing SNP data.
20 def read( 21 self, 22 fields: Optional[List[str]] = None, 23 exclude_fields: Optional[List[str]] = None, 24 rename_fields: Optional[dict] = None, 25 fills: Optional[dict] = None, 26 region: Optional[str] = None, 27 samples: Optional[List[str]] = None, 28 sum_strands: bool = False, 29 ) -> SNPObject: 30 """ 31 Read a vcf file into a SNPObject. 32 33 Args: 34 fields: Fields to extract data for. e.g., ['variants/CHROM', 'variants/POS', 35 'calldata/GT']. If you are feeling lazy, you can drop the 'variants/' 36 and 'calldata/' prefixes, in which case the fields will be matched 37 against fields declared in the VCF header, with variants taking priority 38 over calldata if a field with the same ID exists both in INFO and FORMAT 39 headers. I.e., ['CHROM', 'POS', 'DP', 'GT'] will work, although watch out 40 for fields like 'DP' which can be both INFO and FORMAT. To extract all 41 fields, provide just the string '*'. To extract all variants fields 42 (including all INFO fields) provide 'variants/*'. To extract all 43 calldata fields (i.e., defined in FORMAT headers) provide 'calldata/*'. 44 exclude_fields: Fields to exclude. E.g., for use in combination with fields='*'. 45 rename_fields: Fields to be renamed. Should be a dictionary mapping old to new names. 46 fills: Override the fill value used for empty values. Should be a dictionary 47 mapping field names to fill values. 48 region: Genomic region to extract variants for. If provided, should be a 49 tabix-style region string, which can be either just a chromosome name 50 (e.g., '2L'), or a chromosome name followed by 1-based beginning and 51 end coordinates (e.g., '2L:100000-200000'). Note that only variants 52 whose start position (POS) is within the requested range will be included. 53 This is slightly different from the default tabix behaviour, where a 54 variant (e.g., deletion) may be included if its position (POS) occurs 55 before the requested region but its reference allele overlaps the 56 region - such a variant will not be included in the data returned 57 by this function. 58 samples: Selection of samples to extract calldata for. If provided, should be 59 a list of strings giving sample identifiers. May also be a list of 60 integers giving indices of selected samples. 61 sum_strands: If True, maternal and paternal strands are combined into a single `int8` array with values `{0, 1, 2`}. 62 If False, strands are stored separately as an `int8` array with values `{0, 1}` for each strand. 63 64 Returns: 65 **SNPObject**: 66 A SNPObject instance. 67 """ 68 log.info(f"Reading {self.filename}") 69 70 vcf_dict = allel.read_vcf( 71 str(self.filename), 72 fields=fields, 73 exclude_fields=exclude_fields, 74 rename_fields=rename_fields, 75 fills=fills, 76 region=region, 77 samples=samples, 78 alt_number=1, 79 ) 80 assert vcf_dict is not None # suppress Flake8 warning 81 82 genotypes = vcf_dict["calldata/GT"].astype(np.int8) 83 if sum_strands: 84 genotypes = genotypes.sum(axis=2, dtype=np.int8) 85 86 snpobj = SNPObject( 87 calldata_gt=genotypes, 88 samples=vcf_dict["samples"], 89 variants_ref=vcf_dict["variants/REF"], 90 variants_alt=vcf_dict["variants/ALT"], 91 variants_chrom=vcf_dict["variants/CHROM"], 92 variants_filter_pass=vcf_dict["variants/FILTER_PASS"], 93 variants_id=vcf_dict["variants/ID"], 94 variants_pos=vcf_dict["variants/POS"], 95 variants_qual=vcf_dict["variants/QUAL"], 96 ) 97 98 log.info(f"Finished reading {self.filename}") 99 return snpobj
Read a vcf file into a SNPObject.
Arguments:
- fields: Fields to extract data for. e.g., ['variants/CHROM', 'variants/POS', 'calldata/GT']. If you are feeling lazy, you can drop the 'variants/' and 'calldata/' prefixes, in which case the fields will be matched against fields declared in the VCF header, with variants taking priority over calldata if a field with the same ID exists both in INFO and FORMAT headers. I.e., ['CHROM', 'POS', 'DP', 'GT'] will work, although watch out for fields like 'DP' which can be both INFO and FORMAT. To extract all fields, provide just the string ''. To extract all variants fields (including all INFO fields) provide 'variants/'. To extract all calldata fields (i.e., defined in FORMAT headers) provide 'calldata/*'.
- exclude_fields: Fields to exclude. E.g., for use in combination with fields='*'.
- rename_fields: Fields to be renamed. Should be a dictionary mapping old to new names.
- fills: Override the fill value used for empty values. Should be a dictionary mapping field names to fill values.
- region: Genomic region to extract variants for. If provided, should be a tabix-style region string, which can be either just a chromosome name (e.g., '2L'), or a chromosome name followed by 1-based beginning and end coordinates (e.g., '2L:100000-200000'). Note that only variants whose start position (POS) is within the requested range will be included. This is slightly different from the default tabix behaviour, where a variant (e.g., deletion) may be included if its position (POS) occurs before the requested region but its reference allele overlaps the region - such a variant will not be included in the data returned by this function.
- samples: Selection of samples to extract calldata for. If provided, should be a list of strings giving sample identifiers. May also be a list of integers giving indices of selected samples.
- sum_strands: If True, maternal and paternal strands are combined into a single
int8
array with values{0, 1, 2
}. If False, strands are stored separately as anint8
array with values{0, 1}
for each strand.
Returns:
SNPObject: A SNPObject instance.
14class BEDWriter: 15 """Writes an object in bed/bim/fam formats in the specified output path. 16 17 Args: 18 snpobj: The SNPObject to be written. 19 file: The output file path. 20 21 """ 22 23 def __init__(self, snpobj: SNPObject, filename: str): 24 self.__snpobj = snpobj.copy() 25 self.__filename = Path(filename) 26 27 def write( 28 self, 29 rename_missing_values: bool = True, 30 before: Union[int, float, str] = -1, 31 after: Union[int, float, str] = '.' 32 ): 33 """ 34 Writes the SNPObject to bed/bim/fam formats. 35 36 Args: 37 rename_missing_values (bool, optional): 38 If True, renames potential missing values in `snpobj.calldata_gt` before writing. 39 Defaults to True. 40 before (int, float, or str, default=-1): 41 The current representation of missing values in `calldata_gt`. Common values might be -1, '.', or NaN. 42 Default is -1. 43 after (int, float, or str, default='.'): 44 The value that will replace `before`. Default is '.'. 45 """ 46 # Save .bed file 47 if self.__filename.suffix != '.bed': 48 self.__filename = self.__filename.with_suffix('.bed') 49 50 log.info(f"Writing .bed file: {self.__filename}") 51 52 # Optionally rename potential missing values in `snpobj.calldata_gt` before writing 53 if rename_missing_values: 54 self.__snpobj.rename_missings(before=before, after=after, inplace=True) 55 56 # If the input matrix has three dimensions, it indicates that the data is divided into two strands. 57 if len(self.__snpobj.calldata_gt.shape) == 3: 58 # Sum the two strands 59 self.__snpobj.calldata_gt = self.__snpobj.calldata_gt.transpose(1, 0, 2).sum(axis=2, dtype=np.int8) 60 61 # Infer the number of samples and variants from the matrix 62 samples, variants = self.__snpobj.calldata_gt.shape 63 64 # Define the PgenWriter to save the data 65 data_save = pg.PgenWriter(filename=str(self.__filename).encode('utf-8'), 66 sample_ct=samples, 67 variant_ct=variants, 68 nonref_flags=True, 69 hardcall_phase_present=False, 70 dosage_present=True, 71 dosage_phase_present=False) 72 73 # Fill the data_save object with the matrix of individuals x variants 74 for snp_i in range(0, variants): 75 data_save.append_biallelic(np.ascontiguousarray(self.__snpobj.calldata_gt[:, snp_i])) 76 77 # Save the .bed file 78 data_save.close() 79 80 log.info(f"Finished writing .bed file: {self.__filename}") 81 82 # Remove .bed from the file name 83 if self.__filename.suffix == '.bed': 84 self.__filename = self.__filename.with_suffix('') 85 86 # Save .fam file 87 log.info(f"Writing .fam file: {self.__filename}") 88 89 # Fill .fam file 90 fam_file = pd.DataFrame(columns=['fid', 'iid', 'father', 'mother', 'gender', 'trait']) 91 fam_file['iid'] = self.__snpobj.samples 92 fam_file['fid'] = self.__snpobj.samples 93 94 # Save .fam file 95 fam_file.to_csv(self.__filename.with_suffix('.fam'), sep='\t', index=False, header=False) 96 log.info(f"Finished writing .fam file: {self.__filename}") 97 98 # Save .bim file 99 log.info(f"Writing .bim file: {self.__filename}") 100 101 # Fill .bim file 102 bim_file = pd.DataFrame(columns=['chrom', 'snp', 'cm', 'pos', 'a0', 'a1']) 103 bim_file['chrom'] = self.__snpobj.variants_chrom 104 bim_file['snp'] = self.__snpobj.variants_id 105 bim_file['cm'] = 0 # TODO: read, save and write too if available? 106 log.warning("The .bim file is being saved with 0 cM values.") 107 bim_file['pos'] = self.__snpobj.variants_pos 108 bim_file['a0'] = self.__snpobj.variants_alt 109 bim_file['a1'] = self.__snpobj.variants_ref 110 111 # Save .bim file 112 bim_file.to_csv(self.__filename.with_suffix('.bim'), sep='\t', index=False, header=False) 113 log.info(f"Finished writing .bim file: {self.__filename}")
Writes an object in bed/bim/fam formats in the specified output path.
Arguments:
- snpobj: The SNPObject to be written.
- file: The output file path.
27 def write( 28 self, 29 rename_missing_values: bool = True, 30 before: Union[int, float, str] = -1, 31 after: Union[int, float, str] = '.' 32 ): 33 """ 34 Writes the SNPObject to bed/bim/fam formats. 35 36 Args: 37 rename_missing_values (bool, optional): 38 If True, renames potential missing values in `snpobj.calldata_gt` before writing. 39 Defaults to True. 40 before (int, float, or str, default=-1): 41 The current representation of missing values in `calldata_gt`. Common values might be -1, '.', or NaN. 42 Default is -1. 43 after (int, float, or str, default='.'): 44 The value that will replace `before`. Default is '.'. 45 """ 46 # Save .bed file 47 if self.__filename.suffix != '.bed': 48 self.__filename = self.__filename.with_suffix('.bed') 49 50 log.info(f"Writing .bed file: {self.__filename}") 51 52 # Optionally rename potential missing values in `snpobj.calldata_gt` before writing 53 if rename_missing_values: 54 self.__snpobj.rename_missings(before=before, after=after, inplace=True) 55 56 # If the input matrix has three dimensions, it indicates that the data is divided into two strands. 57 if len(self.__snpobj.calldata_gt.shape) == 3: 58 # Sum the two strands 59 self.__snpobj.calldata_gt = self.__snpobj.calldata_gt.transpose(1, 0, 2).sum(axis=2, dtype=np.int8) 60 61 # Infer the number of samples and variants from the matrix 62 samples, variants = self.__snpobj.calldata_gt.shape 63 64 # Define the PgenWriter to save the data 65 data_save = pg.PgenWriter(filename=str(self.__filename).encode('utf-8'), 66 sample_ct=samples, 67 variant_ct=variants, 68 nonref_flags=True, 69 hardcall_phase_present=False, 70 dosage_present=True, 71 dosage_phase_present=False) 72 73 # Fill the data_save object with the matrix of individuals x variants 74 for snp_i in range(0, variants): 75 data_save.append_biallelic(np.ascontiguousarray(self.__snpobj.calldata_gt[:, snp_i])) 76 77 # Save the .bed file 78 data_save.close() 79 80 log.info(f"Finished writing .bed file: {self.__filename}") 81 82 # Remove .bed from the file name 83 if self.__filename.suffix == '.bed': 84 self.__filename = self.__filename.with_suffix('') 85 86 # Save .fam file 87 log.info(f"Writing .fam file: {self.__filename}") 88 89 # Fill .fam file 90 fam_file = pd.DataFrame(columns=['fid', 'iid', 'father', 'mother', 'gender', 'trait']) 91 fam_file['iid'] = self.__snpobj.samples 92 fam_file['fid'] = self.__snpobj.samples 93 94 # Save .fam file 95 fam_file.to_csv(self.__filename.with_suffix('.fam'), sep='\t', index=False, header=False) 96 log.info(f"Finished writing .fam file: {self.__filename}") 97 98 # Save .bim file 99 log.info(f"Writing .bim file: {self.__filename}") 100 101 # Fill .bim file 102 bim_file = pd.DataFrame(columns=['chrom', 'snp', 'cm', 'pos', 'a0', 'a1']) 103 bim_file['chrom'] = self.__snpobj.variants_chrom 104 bim_file['snp'] = self.__snpobj.variants_id 105 bim_file['cm'] = 0 # TODO: read, save and write too if available? 106 log.warning("The .bim file is being saved with 0 cM values.") 107 bim_file['pos'] = self.__snpobj.variants_pos 108 bim_file['a0'] = self.__snpobj.variants_alt 109 bim_file['a1'] = self.__snpobj.variants_ref 110 111 # Save .bim file 112 bim_file.to_csv(self.__filename.with_suffix('.bim'), sep='\t', index=False, header=False) 113 log.info(f"Finished writing .bim file: {self.__filename}")
Writes the SNPObject to bed/bim/fam formats.
Arguments:
- rename_missing_values (bool, optional): If True, renames potential missing values in
snpobj.calldata_gt
before writing. Defaults to True. - before (int, float, or str, default=-1): The current representation of missing values in
calldata_gt
. Common values might be -1, '.', or NaN. Default is -1. - after (int, float, or str, default='.'): The value that will replace
before
. Default is '.'.
15class PGENWriter: 16 """ 17 Writes a genotype object in PGEN format (.pgen, .psam, and .pvar files) in the specified output path. 18 """ 19 20 def __init__(self, snpobj: SNPObject, filename: str): 21 """ 22 Initializes the PGENWriter instance. 23 24 Args: 25 snpobj (SNPObject): The SNPObject containing genotype data to be written. 26 filename (str): Base path for the output files (excluding extension). 27 """ 28 self.__snpobj = snpobj 29 self.__filename = Path(filename) 30 31 def write( 32 self, 33 vzs: bool = False, 34 rename_missing_values: bool = True, 35 before: Union[int, float, str] = -1, 36 after: Union[int, float, str] = '.' 37 ): 38 """ 39 Writes the SNPObject data to .pgen, .psam, and .pvar files. 40 41 Args: 42 vzs (bool, optional): 43 If True, compresses the .pvar file using zstd and saves it as .pvar.zst. Defaults to False. 44 rename_missing_values (bool, optional): 45 If True, renames potential missing values in `snpobj.calldata_gt` before writing. 46 Defaults to True. 47 before (int, float, or str, default=-1): 48 The current representation of missing values in `calldata_gt`. Common values might be -1, '.', or NaN. 49 Default is -1. 50 after (int, float, or str, default='.'): 51 The value that will replace `before`. Default is '.'. 52 """ 53 file_extensions = (".pgen", ".psam", ".pvar", ".pvar.zst") 54 if self.__filename.suffix in file_extensions: 55 self.__filename = self.__filename.with_suffix('') 56 57 # Optionally rename potential missing values in `snpobj.calldata_gt` before writing 58 if rename_missing_values: 59 self.__snpobj.rename_missings(before=before, after=after, inplace=True) 60 61 self.write_pvar(vzs=vzs) 62 self.write_psam() 63 self.write_pgen() 64 65 def write_pvar(self, vzs: bool = False): 66 """ 67 Writes variant data to the .pvar file. 68 69 Args: 70 vzs (bool, optional): If True, compresses the .pvar file using zstd and saves it as .pvar.zst. Defaults to False. 71 """ 72 output_filename = f"{self.__filename}.pvar" 73 if vzs: 74 output_filename += ".zst" 75 log.info(f"Writing to {output_filename} (compressed)") 76 else: 77 log.info(f"Writing to {output_filename}") 78 79 df = pl.DataFrame( 80 { 81 "#CHROM": self.__snpobj.variants_chrom, 82 "POS": self.__snpobj.variants_pos, 83 "ID": self.__snpobj.variants_id, 84 "REF": self.__snpobj.variants_ref, 85 "ALT": self.__snpobj.variants_alt, 86 "FILTER": self.__snpobj.variants_filter_pass, 87 # TODO: add INFO column to SNPObject and write it to the .pvar file? (if not it's lost) 88 } 89 ) 90 # TODO: add header to the .pvar file, if not it's lost 91 92 # Write the DataFrame to a CSV string 93 csv_data = df.write_csv(None, separator="\t") 94 95 if vzs: 96 # Compress the CSV data using zstd 97 cctx = zstd.ZstdCompressor() 98 compressed_data = cctx.compress(csv_data.encode('utf-8')) 99 with open(output_filename, 'wb') as f: 100 f.write(compressed_data) 101 else: 102 with open(output_filename, 'w') as f: 103 f.write(csv_data) 104 105 def write_psam(self): 106 """ 107 Writes sample metadata to the .psam file. 108 """ 109 log.info(f"Writing {self.__filename}.psam") 110 df = pl.DataFrame( 111 { 112 "#IID": self.__snpobj.samples, 113 "SEX": "NA", # Add SEX as nan for now 114 # TODO: add SEX as Optional column to SNPObject and write it to the .psam file (if not it's lost) 115 } 116 ) 117 df.write_csv(f"{self.__filename}.psam", separator="\t") 118 119 def write_pgen(self): 120 """ 121 Writes the genotype data to a .pgen file. 122 """ 123 log.info(f"Writing to {self.__filename}.pgen") 124 summed_strands = False if self.__snpobj.calldata_gt.ndim == 3 else True 125 if not summed_strands: 126 num_variants, num_samples, num_alleles = self.__snpobj.calldata_gt.shape 127 # Flatten the genotype matrix for pgenlib 128 flat_genotypes = self.__snpobj.calldata_gt.reshape( 129 num_variants, num_samples * num_alleles 130 ) 131 with pg.PgenWriter( 132 filename=f"{self.__filename}.pgen".encode('utf-8'), 133 sample_ct=num_samples, 134 variant_ct=num_variants, 135 hardcall_phase_present=True, 136 ) as writer: 137 for variant_index in range(num_variants): 138 writer.append_alleles( 139 flat_genotypes[variant_index].astype(np.int32), all_phased=True 140 ) 141 else: 142 num_variants, num_samples = self.__snpobj.calldata_gt.shape 143 # Transpose to (samples, variants) 144 genotypes = self.__snpobj.calldata_gt.T # Shape is (samples, variants) 145 with pg.PgenWriter( 146 filename=f"{self.__filename}.pgen".encode('utf-8'), 147 sample_ct=num_samples, 148 variant_ct=num_variants, 149 hardcall_phase_present=False, 150 ) as writer: 151 for variant_index in range(num_variants): 152 variant_genotypes = genotypes[:, variant_index].astype(np.int8) 153 # Map missing genotypes to -9 if necessary 154 variant_genotypes[variant_genotypes == -1] = -9 155 writer.append_biallelic(np.ascontiguousarray(variant_genotypes))
Writes a genotype object in PGEN format (.pgen, .psam, and .pvar files) in the specified output path.
20 def __init__(self, snpobj: SNPObject, filename: str): 21 """ 22 Initializes the PGENWriter instance. 23 24 Args: 25 snpobj (SNPObject): The SNPObject containing genotype data to be written. 26 filename (str): Base path for the output files (excluding extension). 27 """ 28 self.__snpobj = snpobj 29 self.__filename = Path(filename)
Initializes the PGENWriter instance.
Arguments:
- snpobj (SNPObject): The SNPObject containing genotype data to be written.
- filename (str): Base path for the output files (excluding extension).
31 def write( 32 self, 33 vzs: bool = False, 34 rename_missing_values: bool = True, 35 before: Union[int, float, str] = -1, 36 after: Union[int, float, str] = '.' 37 ): 38 """ 39 Writes the SNPObject data to .pgen, .psam, and .pvar files. 40 41 Args: 42 vzs (bool, optional): 43 If True, compresses the .pvar file using zstd and saves it as .pvar.zst. Defaults to False. 44 rename_missing_values (bool, optional): 45 If True, renames potential missing values in `snpobj.calldata_gt` before writing. 46 Defaults to True. 47 before (int, float, or str, default=-1): 48 The current representation of missing values in `calldata_gt`. Common values might be -1, '.', or NaN. 49 Default is -1. 50 after (int, float, or str, default='.'): 51 The value that will replace `before`. Default is '.'. 52 """ 53 file_extensions = (".pgen", ".psam", ".pvar", ".pvar.zst") 54 if self.__filename.suffix in file_extensions: 55 self.__filename = self.__filename.with_suffix('') 56 57 # Optionally rename potential missing values in `snpobj.calldata_gt` before writing 58 if rename_missing_values: 59 self.__snpobj.rename_missings(before=before, after=after, inplace=True) 60 61 self.write_pvar(vzs=vzs) 62 self.write_psam() 63 self.write_pgen()
Writes the SNPObject data to .pgen, .psam, and .pvar files.
Arguments:
- vzs (bool, optional): If True, compresses the .pvar file using zstd and saves it as .pvar.zst. Defaults to False.
- rename_missing_values (bool, optional): If True, renames potential missing values in
snpobj.calldata_gt
before writing. Defaults to True. - before (int, float, or str, default=-1): The current representation of missing values in
calldata_gt
. Common values might be -1, '.', or NaN. Default is -1. - after (int, float, or str, default='.'): The value that will replace
before
. Default is '.'.
65 def write_pvar(self, vzs: bool = False): 66 """ 67 Writes variant data to the .pvar file. 68 69 Args: 70 vzs (bool, optional): If True, compresses the .pvar file using zstd and saves it as .pvar.zst. Defaults to False. 71 """ 72 output_filename = f"{self.__filename}.pvar" 73 if vzs: 74 output_filename += ".zst" 75 log.info(f"Writing to {output_filename} (compressed)") 76 else: 77 log.info(f"Writing to {output_filename}") 78 79 df = pl.DataFrame( 80 { 81 "#CHROM": self.__snpobj.variants_chrom, 82 "POS": self.__snpobj.variants_pos, 83 "ID": self.__snpobj.variants_id, 84 "REF": self.__snpobj.variants_ref, 85 "ALT": self.__snpobj.variants_alt, 86 "FILTER": self.__snpobj.variants_filter_pass, 87 # TODO: add INFO column to SNPObject and write it to the .pvar file? (if not it's lost) 88 } 89 ) 90 # TODO: add header to the .pvar file, if not it's lost 91 92 # Write the DataFrame to a CSV string 93 csv_data = df.write_csv(None, separator="\t") 94 95 if vzs: 96 # Compress the CSV data using zstd 97 cctx = zstd.ZstdCompressor() 98 compressed_data = cctx.compress(csv_data.encode('utf-8')) 99 with open(output_filename, 'wb') as f: 100 f.write(compressed_data) 101 else: 102 with open(output_filename, 'w') as f: 103 f.write(csv_data)
Writes variant data to the .pvar file.
Arguments:
- vzs (bool, optional): If True, compresses the .pvar file using zstd and saves it as .pvar.zst. Defaults to False.
105 def write_psam(self): 106 """ 107 Writes sample metadata to the .psam file. 108 """ 109 log.info(f"Writing {self.__filename}.psam") 110 df = pl.DataFrame( 111 { 112 "#IID": self.__snpobj.samples, 113 "SEX": "NA", # Add SEX as nan for now 114 # TODO: add SEX as Optional column to SNPObject and write it to the .psam file (if not it's lost) 115 } 116 ) 117 df.write_csv(f"{self.__filename}.psam", separator="\t")
Writes sample metadata to the .psam file.
119 def write_pgen(self): 120 """ 121 Writes the genotype data to a .pgen file. 122 """ 123 log.info(f"Writing to {self.__filename}.pgen") 124 summed_strands = False if self.__snpobj.calldata_gt.ndim == 3 else True 125 if not summed_strands: 126 num_variants, num_samples, num_alleles = self.__snpobj.calldata_gt.shape 127 # Flatten the genotype matrix for pgenlib 128 flat_genotypes = self.__snpobj.calldata_gt.reshape( 129 num_variants, num_samples * num_alleles 130 ) 131 with pg.PgenWriter( 132 filename=f"{self.__filename}.pgen".encode('utf-8'), 133 sample_ct=num_samples, 134 variant_ct=num_variants, 135 hardcall_phase_present=True, 136 ) as writer: 137 for variant_index in range(num_variants): 138 writer.append_alleles( 139 flat_genotypes[variant_index].astype(np.int32), all_phased=True 140 ) 141 else: 142 num_variants, num_samples = self.__snpobj.calldata_gt.shape 143 # Transpose to (samples, variants) 144 genotypes = self.__snpobj.calldata_gt.T # Shape is (samples, variants) 145 with pg.PgenWriter( 146 filename=f"{self.__filename}.pgen".encode('utf-8'), 147 sample_ct=num_samples, 148 variant_ct=num_variants, 149 hardcall_phase_present=False, 150 ) as writer: 151 for variant_index in range(num_variants): 152 variant_genotypes = genotypes[:, variant_index].astype(np.int8) 153 # Map missing genotypes to -9 if necessary 154 variant_genotypes[variant_genotypes == -1] = -9 155 writer.append_biallelic(np.ascontiguousarray(variant_genotypes))
Writes the genotype data to a .pgen file.
12class VCFWriter: 13 """ 14 A writer class for exporting SNP data from a `snputils.snp.genobj.SNPObject` 15 into an `.vcf` file. 16 """ 17 def __init__(self, snpobj: SNPObject, filename: str, n_jobs: int = -1, phased: bool = False): 18 """ 19 Args: 20 snpobj (SNPObject): 21 A SNPObject instance. 22 file (str or pathlib.Path): 23 Path to the file where the data will be saved. It should end with `.vcf`. 24 If the provided path does not have this extension, the `.vcf` extension will be appended. 25 n_jobs: 26 Number of jobs to run in parallel. 27 - `None`: use 1 job unless within a `joblib.parallel_backend` context. 28 - `-1`: use all available processors. 29 - Any other integer: use the specified number of jobs. 30 phased: 31 If True, genotype data is written in "maternal|paternal" format. 32 If False, genotype data is written in "maternal/paternal" format. 33 """ 34 self.__snpobj = snpobj 35 self.__filename = Path(filename) 36 self.__n_jobs = n_jobs 37 self.__phased = phased 38 39 def write( 40 self, 41 chrom_partition: bool = False, 42 rename_missing_values: bool = True, 43 before: Union[int, float, str] = -1, 44 after: Union[int, float, str] = '.' 45 ): 46 """ 47 Writes the SNP data to VCF file(s). 48 49 Args: 50 chrom_partition (bool, optional): 51 If True, individual VCF files are generated for each chromosome. 52 If False, a single VCF file containing data for all chromosomes is created. Defaults to False. 53 rename_missing_values (bool, optional): 54 If True, renames potential missing values in `snpobj.calldata_gt` before writing. 55 Defaults to True. 56 before (int, float, or str, default=-1): 57 The current representation of missing values in `calldata_gt`. Common values might be -1, '.', or NaN. 58 Default is -1. 59 after (int, float, or str, default='.'): 60 The value that will replace `before`. Default is '.'. 61 """ 62 self.__chrom_partition = chrom_partition 63 64 file_extensions = (".vcf", ".bcf") 65 if self.__filename.suffix in file_extensions: 66 self.__file_extension = self.__filename.suffix 67 self.__filename = self.__filename.with_suffix('') 68 else: 69 self.__file_extension = ".vcf" 70 71 # Optionally rename potential missing values in `snpobj.calldata_gt` before writing 72 if rename_missing_values: 73 self.__snpobj.rename_missings(before=before, after=after, inplace=True) 74 75 data = self.__snpobj 76 77 if self.__chrom_partition: 78 chroms = data.unique_chrom 79 80 for chrom in chroms: 81 # Filter to include the data for the chromosome in particular 82 data_chrom = data.filter_variants(chrom=chrom, inplace=False) 83 84 log.debug(f'Storing chromosome {chrom}') 85 self._write_chromosome_data(chrom, data_chrom) 86 else: 87 self._write_chromosome_data("All", data) 88 89 def _write_chromosome_data(self, chrom, data_chrom): 90 """ 91 Writes the SNP data for a specific chromosome to a VCF file. 92 93 Args: 94 chrom: The chromosome name. 95 data_chrom: The SNPObject instance containing the data for the chromosome. 96 """ 97 # Obtain npy matrix with SNPs 98 npy3 = data_chrom.calldata_gt # shape: (n_windows, n_samples, 2) 99 n_windows, n_samples, _ = npy3.shape 100 101 # Keep sample names if appropriate 102 data_samples = data_chrom.samples if len(data_chrom.samples) == n_samples else [get_name() for _ in range(n_samples)] 103 104 # Format output file 105 if chrom == "All": 106 file = self.__filename.with_suffix(self.__file_extension) 107 else: 108 file = self.__filename.parent / f"{self.__filename.stem}_{chrom}{self.__file_extension}" 109 110 # Write VCF file 111 out = open(self.__filename.with_suffix(self.__file_extension), "w") 112 # --- write VCF header --- 113 out.write("##fileformat=VCFv4.1\n") 114 out.write('##FORMAT=<ID=GT,Number=1,Type=String,Description="Phased Genotype">\n') 115 116 for c in set(data_chrom.variants_chrom): 117 out.write(f"##contig=<ID={c}>\n") 118 cols = ["#CHROM","POS","ID","REF","ALT","QUAL","FILTER","INFO","FORMAT"] + list(data_chrom.samples) 119 out.write("\t".join(cols) + "\n") 120 121 sep = "|" if self.__phased else "/" 122 for i in range(n_windows): 123 chrom = data_chrom.variants_chrom[i] 124 pos = data_chrom.variants_pos[i] 125 vid = data_chrom.variants_id[i] 126 ref = data_chrom.variants_ref[i] 127 alt = data_chrom.variants_alt[i] 128 129 # build genotype list per sample, one small array at a time 130 row = npy3[i] # shape: (n_samples, 2) 131 genotypes = [ 132 f"{row[s,0]}{sep}{row[s,1]}" 133 for s in range(n_samples) 134 ] 135 136 line = "\t".join([ 137 str(chrom), str(pos), vid, ref, alt, 138 ".", "PASS", ".", "GT", *genotypes 139 ]) 140 out.write(line + "\n") 141 142 out.close()
A writer class for exporting SNP data from a snputils.snp.genobj.SNPObject
into an .vcf
file.
17 def __init__(self, snpobj: SNPObject, filename: str, n_jobs: int = -1, phased: bool = False): 18 """ 19 Args: 20 snpobj (SNPObject): 21 A SNPObject instance. 22 file (str or pathlib.Path): 23 Path to the file where the data will be saved. It should end with `.vcf`. 24 If the provided path does not have this extension, the `.vcf` extension will be appended. 25 n_jobs: 26 Number of jobs to run in parallel. 27 - `None`: use 1 job unless within a `joblib.parallel_backend` context. 28 - `-1`: use all available processors. 29 - Any other integer: use the specified number of jobs. 30 phased: 31 If True, genotype data is written in "maternal|paternal" format. 32 If False, genotype data is written in "maternal/paternal" format. 33 """ 34 self.__snpobj = snpobj 35 self.__filename = Path(filename) 36 self.__n_jobs = n_jobs 37 self.__phased = phased
Arguments:
- snpobj (SNPObject): A SNPObject instance.
- file (str or pathlib.Path): Path to the file where the data will be saved. It should end with
.vcf
. If the provided path does not have this extension, the.vcf
extension will be appended. - n_jobs: Number of jobs to run in parallel.
None
: use 1 job unless within ajoblib.parallel_backend
context.-1
: use all available processors.- Any other integer: use the specified number of jobs.
- phased: If True, genotype data is written in "maternal|paternal" format.
If False, genotype data is written in "maternal/paternal" format.
39 def write( 40 self, 41 chrom_partition: bool = False, 42 rename_missing_values: bool = True, 43 before: Union[int, float, str] = -1, 44 after: Union[int, float, str] = '.' 45 ): 46 """ 47 Writes the SNP data to VCF file(s). 48 49 Args: 50 chrom_partition (bool, optional): 51 If True, individual VCF files are generated for each chromosome. 52 If False, a single VCF file containing data for all chromosomes is created. Defaults to False. 53 rename_missing_values (bool, optional): 54 If True, renames potential missing values in `snpobj.calldata_gt` before writing. 55 Defaults to True. 56 before (int, float, or str, default=-1): 57 The current representation of missing values in `calldata_gt`. Common values might be -1, '.', or NaN. 58 Default is -1. 59 after (int, float, or str, default='.'): 60 The value that will replace `before`. Default is '.'. 61 """ 62 self.__chrom_partition = chrom_partition 63 64 file_extensions = (".vcf", ".bcf") 65 if self.__filename.suffix in file_extensions: 66 self.__file_extension = self.__filename.suffix 67 self.__filename = self.__filename.with_suffix('') 68 else: 69 self.__file_extension = ".vcf" 70 71 # Optionally rename potential missing values in `snpobj.calldata_gt` before writing 72 if rename_missing_values: 73 self.__snpobj.rename_missings(before=before, after=after, inplace=True) 74 75 data = self.__snpobj 76 77 if self.__chrom_partition: 78 chroms = data.unique_chrom 79 80 for chrom in chroms: 81 # Filter to include the data for the chromosome in particular 82 data_chrom = data.filter_variants(chrom=chrom, inplace=False) 83 84 log.debug(f'Storing chromosome {chrom}') 85 self._write_chromosome_data(chrom, data_chrom) 86 else: 87 self._write_chromosome_data("All", data)
Writes the SNP data to VCF file(s).
Arguments:
- chrom_partition (bool, optional): If True, individual VCF files are generated for each chromosome. If False, a single VCF file containing data for all chromosomes is created. Defaults to False.
- rename_missing_values (bool, optional): If True, renames potential missing values in
snpobj.calldata_gt
before writing. Defaults to True. - before (int, float, or str, default=-1): The current representation of missing values in
calldata_gt
. Common values might be -1, '.', or NaN. Default is -1. - after (int, float, or str, default='.'): The value that will replace
before
. Default is '.'.