snputils.snp.io
1from .read import SNPReader, BEDReader, PGENReader, VCFReader, read_snp, read_bed, read_pgen, read_vcf 2from .write import BEDWriter, PGENWriter, VCFWriter 3 4__all__ = ['read_snp', 'read_bed', 'read_pgen', 'read_vcf', 5 'SNPReader', 'BEDReader', 'PGENReader', 'VCFReader', 6 'BEDWriter', 'PGENWriter', 'VCFWriter']
8def read_snp(filename: Union[str, pathlib.Path], **kwargs) -> SNPObject: 9 """ 10 Automatically detect the file format and read it into a SNPObject. 11 12 Args: 13 filename: Filename of the file to read. 14 **kwargs: Additional arguments passed to the reader method. 15 16 Raises: 17 ValueError: If the filename does not have an extension or the extension is not supported. 18 """ 19 from snputils.snp.io.read.auto import SNPReader 20 21 return SNPReader(filename).read(**kwargs)
Automatically detect the file format and read it into a SNPObject.
Arguments:
- filename: Filename of the file to read.
- **kwargs: Additional arguments passed to the reader method.
Raises:
- ValueError: If the filename does not have an extension or the extension is not supported.
24def read_bed(filename: Union[str, pathlib.Path], **kwargs) -> SNPObject: 25 """ 26 Read a BED fileset into a SNPObject. 27 28 Args: 29 filename: Filename of the BED fileset to read. 30 **kwargs: Additional arguments passed to the reader method. See :class:`snputils.snp.io.read.bed.BEDReader` for possible parameters. 31 """ 32 from snputils.snp.io.read.bed import BEDReader 33 34 return BEDReader(filename).read(**kwargs)
Read a BED fileset into a SNPObject.
Arguments:
- filename: Filename of the BED fileset to read.
- **kwargs: Additional arguments passed to the reader method. See
snputils.snp.io.read.bed.BEDReader
for possible parameters.
37def read_pgen(filename: Union[str, pathlib.Path], **kwargs) -> SNPObject: 38 """ 39 Read a PGEN fileset into a SNPObject. 40 41 Args: 42 filename: Filename of the PGEN fileset to read. 43 **kwargs: Additional arguments passed to the reader method. See :class:`snputils.snp.io.read.pgen.PGENReader` for possible parameters. 44 """ 45 from snputils.snp.io.read.pgen import PGENReader 46 47 return PGENReader(filename).read(**kwargs)
Read a PGEN fileset into a SNPObject.
Arguments:
- filename: Filename of the PGEN fileset to read.
- **kwargs: Additional arguments passed to the reader method. See
snputils.snp.io.read.pgen.PGENReader
for possible parameters.
50def read_vcf(filename: Union[str, pathlib.Path], 51 backend: str = 'polars', 52 **kwargs) -> SNPObject: 53 """ 54 Read a VCF fileset into a SNPObject. 55 56 Args: 57 filename: Filename of the VCF fileset to read. 58 backend: Backend to use for reading the VCF file. Options are 'polars' or 'scikit-allel'. 59 **kwargs: Additional arguments passed to the reader method. See :class:`snputils.snp.io.read.vcf.VCFReader` for possible parameters. 60 """ 61 from snputils.snp.io.read.vcf import VCFReader, VCFReaderPolars 62 if backend == 'polars': 63 print(f"Reading {filename} with polars backend") 64 return VCFReaderPolars(filename).read(**kwargs) 65 else: 66 print(f"Reading {filename} with scikit-allel backend") 67 return VCFReader(filename).read(**kwargs)
Read a VCF fileset into a SNPObject.
Arguments:
- filename: Filename of the VCF fileset to read.
- backend: Backend to use for reading the VCF file. Options are 'polars' or 'scikit-allel'.
- **kwargs: Additional arguments passed to the reader method. See
snputils.snp.io.read.vcf.VCFReader
for possible parameters.
8class SNPReader: 9 def __new__(cls, 10 filename: Union[str, pathlib.Path], 11 vcf_backend: str = 'polars') -> SNPReader: 12 """ 13 Automatically detect the SNP file format from the file extension, and return its corresponding reader. 14 15 Args: 16 filename: Filename of the file to read. 17 vcf_backend: Backend to use for reading the VCF file. Options are 'polars' or 'scikit-allel'. Default is 'polars'. 18 19 Raises: 20 ValueError: If the filename does not have an extension or the extension is not supported. 21 """ 22 filename = pathlib.Path(filename) 23 suffixes = filename.suffixes 24 if not suffixes: 25 raise ValueError("The filename should have an extension when using SNPReader.") 26 27 extension = suffixes[-2] if suffixes[-1].lower() in (".zst", ".gz") else suffixes[-1] 28 extension = extension.lower() 29 30 if extension == ".vcf": 31 if vcf_backend == 'polars': 32 from snputils.snp.io.read.vcf import VCFReaderPolars 33 34 return VCFReaderPolars(filename) 35 elif vcf_backend == 'scikit-allel': 36 from snputils.snp.io.read.vcf import VCFReader 37 38 return VCFReader(filename) 39 else: 40 raise ValueError(f"VCF backend not supported: {vcf_backend}") 41 elif extension in (".bed", ".bim", ".fam"): 42 from snputils.snp.io.read.bed import BEDReader 43 44 return BEDReader(filename) 45 elif extension in (".pgen", ".pvar", ".psam", ".pvar.zst"): 46 from snputils.snp.io.read.pgen import PGENReader 47 48 return PGENReader(filename) 49 else: 50 raise ValueError(f"File format not supported: {filename}")
9 def __new__(cls, 10 filename: Union[str, pathlib.Path], 11 vcf_backend: str = 'polars') -> SNPReader: 12 """ 13 Automatically detect the SNP file format from the file extension, and return its corresponding reader. 14 15 Args: 16 filename: Filename of the file to read. 17 vcf_backend: Backend to use for reading the VCF file. Options are 'polars' or 'scikit-allel'. Default is 'polars'. 18 19 Raises: 20 ValueError: If the filename does not have an extension or the extension is not supported. 21 """ 22 filename = pathlib.Path(filename) 23 suffixes = filename.suffixes 24 if not suffixes: 25 raise ValueError("The filename should have an extension when using SNPReader.") 26 27 extension = suffixes[-2] if suffixes[-1].lower() in (".zst", ".gz") else suffixes[-1] 28 extension = extension.lower() 29 30 if extension == ".vcf": 31 if vcf_backend == 'polars': 32 from snputils.snp.io.read.vcf import VCFReaderPolars 33 34 return VCFReaderPolars(filename) 35 elif vcf_backend == 'scikit-allel': 36 from snputils.snp.io.read.vcf import VCFReader 37 38 return VCFReader(filename) 39 else: 40 raise ValueError(f"VCF backend not supported: {vcf_backend}") 41 elif extension in (".bed", ".bim", ".fam"): 42 from snputils.snp.io.read.bed import BEDReader 43 44 return BEDReader(filename) 45 elif extension in (".pgen", ".pvar", ".psam", ".pvar.zst"): 46 from snputils.snp.io.read.pgen import PGENReader 47 48 return PGENReader(filename) 49 else: 50 raise ValueError(f"File format not supported: {filename}")
Automatically detect the SNP file format from the file extension, and return its corresponding reader.
Arguments:
- filename: Filename of the file to read.
- vcf_backend: Backend to use for reading the VCF file. Options are 'polars' or 'scikit-allel'. Default is 'polars'.
Raises:
- ValueError: If the filename does not have an extension or the extension is not supported.
16@SNPBaseReader.register 17class BEDReader(SNPBaseReader): 18 def read( 19 self, 20 fields: Optional[List[str]] = None, 21 exclude_fields: Optional[List[str]] = None, 22 sample_ids: Optional[np.ndarray] = None, 23 sample_idxs: Optional[np.ndarray] = None, 24 variant_ids: Optional[np.ndarray] = None, 25 variant_idxs: Optional[np.ndarray] = None, 26 sum_strands: bool = False, 27 separator: Optional[str] = None, 28 ) -> SNPObject: 29 """ 30 Read a bed fileset (bed, bim, fam) into a SNPObject. 31 32 Args: 33 fields (str, None, or list of str, optional): Fields to extract data for that should be included in the returned SNPObject. 34 Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS'. 35 To extract all fields, set fields to None. Defaults to None. 36 exclude_fields (str, None, or list of str, optional): Fields to exclude from the returned SNPObject. 37 Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS'. 38 To exclude no fields, set exclude_fields to None. Defaults to None. 39 sample_ids: List of sample IDs to read. If None and sample_idxs is None, all samples are read. 40 sample_idxs: List of sample indices to read. If None and sample_ids is None, all samples are read. 41 variant_ids: List of variant IDs to read. If None and variant_idxs is None, all variants are read. 42 variant_idxs: List of variant indices to read. If None and variant_ids is None, all variants are read. 43 sum_strands: True if the maternal and paternal strands are to be summed together, 44 False if the strands are to be stored separately. Note that due to the pgenlib backend, when sum_strands is False, 45 8 times as much RAM is required. Nonetheless, the calldata_gt will only be double the size. 46 WARNING: bed files do not store phase information. If you need it, use vcf or pgen. 47 separator: Separator used in the pvar file. If None, the separator is automatically detected. 48 If the automatic detection fails, please specify the separator manually. 49 50 Returns: 51 snpobj: SNPObject containing the data from the pgen fileset. 52 If sum_strands is False, calldata_gt is stored as a numpy array of shape 53 (num_variants, num_samples, 2) and dtype int8 containing 0, 1. 54 If sum_strands is True, calldata_gt is stored as a numpy array of shape 55 (num_variants, num_samples) and dtype int8 containing 0, 1, 2. 56 57 Raises: 58 AssertionError: If both sample_idxs and sample_ids are specified. 59 AssertionError: If both variant_idxs and variant_ids are specified. 60 """ 61 assert ( 62 sample_idxs is None or sample_ids is None 63 ), "Only one of sample_idxs and sample_ids can be specified" 64 assert ( 65 variant_idxs is None or variant_ids is None 66 ), "Only one of variant_idxs and variant_ids can be specified" 67 68 if isinstance(fields, str): 69 fields = [fields] 70 if isinstance(exclude_fields, str): 71 exclude_fields = [exclude_fields] 72 73 fields = fields or ["GT", "IID", "REF", "ALT", "#CHROM", "ID", "POS"] 74 exclude_fields = exclude_fields or [] 75 fields = [field for field in fields if field not in exclude_fields] 76 only_read_bed = fields == ["GT"] and variant_idxs is None and sample_idxs is None 77 78 filename_noext = str(self.filename) 79 if filename_noext[-4:].lower() in (".bed", ".bim", ".fam"): 80 filename_noext = filename_noext[:-4] 81 82 if only_read_bed: 83 with open(filename_noext + '.fam', 'r') as f: 84 file_num_samples = sum(1 for _ in f) # Get sample count from fam file 85 file_num_variants = None # Not needed 86 else: 87 log.info(f"Reading {filename_noext}.bim") 88 89 if separator is None: 90 with open(filename_noext + ".bim", "r") as file: 91 separator = csv.Sniffer().sniff(file.readline()).delimiter 92 93 bim = pl.read_csv( 94 filename_noext + ".bim", 95 separator=separator, 96 has_header=False, 97 new_columns=["#CHROM", "ID", "CM", "POS", "ALT", "REF"], 98 schema_overrides={ 99 "#CHROM": pl.String, 100 "ID": pl.String, 101 "CM": pl.Float64, 102 "POS": pl.Int64, 103 "ALT": pl.String, 104 "REF": pl.String 105 }, 106 null_values=["NA"] 107 ).with_row_index() 108 file_num_variants = bim.height 109 110 if variant_ids is not None: 111 variant_idxs = bim.filter(pl.col("ID").is_in(variant_ids)).select("index").to_series().to_numpy() 112 113 if variant_idxs is None: 114 num_variants = file_num_variants 115 variant_idxs = np.arange(num_variants, dtype=np.uint32) 116 else: 117 num_variants = np.size(variant_idxs) 118 variant_idxs = np.array(variant_idxs, dtype=np.uint32) 119 bim = bim.filter(pl.col("index").is_in(variant_idxs)) 120 121 log.info(f"Reading {filename_noext}.fam") 122 123 fam = pl.read_csv( 124 filename_noext + ".fam", 125 separator=separator, 126 has_header=False, 127 new_columns=["Family ID", "IID", "Father ID", 128 "Mother ID", "Sex code", "Phenotype value"], 129 schema_overrides={ 130 "Family ID": pl.String, 131 "IID": pl.String, 132 "Father ID": pl.String, 133 "Mother ID": pl.String, 134 "Sex code": pl.String, 135 }, 136 null_values=["NA"] 137 ).with_row_index() 138 file_num_samples = fam.height 139 140 if sample_ids is not None: 141 sample_idxs = fam.filter(pl.col("IID").is_in(sample_ids)).select("index").to_series().to_numpy() 142 143 if sample_idxs is None: 144 num_samples = file_num_samples 145 else: 146 num_samples = np.size(sample_idxs) 147 sample_idxs = np.array(sample_idxs, dtype=np.uint32) 148 fam = fam.filter(pl.col("index").is_in(sample_idxs)) 149 150 if "GT" in fields: 151 log.info(f"Reading {filename_noext}.bed") 152 pgen_reader = pg.PgenReader( 153 str.encode(filename_noext + ".bed"), 154 raw_sample_ct=file_num_samples, 155 variant_ct=file_num_variants, 156 sample_subset=sample_idxs, 157 ) 158 159 if only_read_bed: 160 num_samples = pgen_reader.get_raw_sample_ct() 161 num_variants = pgen_reader.get_variant_ct() 162 variant_idxs = np.arange(num_variants, dtype=np.uint32) 163 164 # required arrays: variant_idxs + sample_idxs + genotypes 165 if not sum_strands: 166 required_ram = (num_samples + num_variants + num_variants * 2 * num_samples) * 4 167 else: 168 required_ram = (num_samples + num_variants) * 4 + num_variants * num_samples 169 log.info(f">{required_ram / 1024**3:.2f} GiB of RAM are required to process {num_samples} samples with {num_variants} variants each") 170 171 if not sum_strands: 172 genotypes = np.empty((num_variants, 2 * num_samples), dtype=np.int32) # cannot use int8 because of pgenlib 173 pgen_reader.read_alleles_list(variant_idxs, genotypes) 174 genotypes = genotypes.astype(np.int8).reshape((num_variants, num_samples, 2)) 175 else: 176 genotypes = np.empty((num_variants, num_samples), dtype=np.int8) 177 pgen_reader.read_list(variant_idxs, genotypes) 178 pgen_reader.close() 179 else: 180 genotypes = None 181 182 log.info("Constructing SNPObject") 183 184 snpobj = SNPObject( 185 calldata_gt=genotypes if "GT" in fields else None, 186 samples=fam.get_column("IID").to_numpy() if "IID" in fields and "IID" in fam.columns else None, 187 **{f'variants_{k.lower()}': bim.get_column(v).to_numpy() if v in fields and v in bim.columns else None 188 for k, v in {'ref': 'REF', 'alt': 'ALT', 'chrom': '#CHROM', 'id': 'ID', 'pos': 'POS'}.items()} 189 ) 190 191 log.info("Finished constructing SNPObject") 192 return snpobj
Abstract class for SNP readers.
Attributes:
- _filename: The path to the file storing SNP data.
18 def read( 19 self, 20 fields: Optional[List[str]] = None, 21 exclude_fields: Optional[List[str]] = None, 22 sample_ids: Optional[np.ndarray] = None, 23 sample_idxs: Optional[np.ndarray] = None, 24 variant_ids: Optional[np.ndarray] = None, 25 variant_idxs: Optional[np.ndarray] = None, 26 sum_strands: bool = False, 27 separator: Optional[str] = None, 28 ) -> SNPObject: 29 """ 30 Read a bed fileset (bed, bim, fam) into a SNPObject. 31 32 Args: 33 fields (str, None, or list of str, optional): Fields to extract data for that should be included in the returned SNPObject. 34 Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS'. 35 To extract all fields, set fields to None. Defaults to None. 36 exclude_fields (str, None, or list of str, optional): Fields to exclude from the returned SNPObject. 37 Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS'. 38 To exclude no fields, set exclude_fields to None. Defaults to None. 39 sample_ids: List of sample IDs to read. If None and sample_idxs is None, all samples are read. 40 sample_idxs: List of sample indices to read. If None and sample_ids is None, all samples are read. 41 variant_ids: List of variant IDs to read. If None and variant_idxs is None, all variants are read. 42 variant_idxs: List of variant indices to read. If None and variant_ids is None, all variants are read. 43 sum_strands: True if the maternal and paternal strands are to be summed together, 44 False if the strands are to be stored separately. Note that due to the pgenlib backend, when sum_strands is False, 45 8 times as much RAM is required. Nonetheless, the calldata_gt will only be double the size. 46 WARNING: bed files do not store phase information. If you need it, use vcf or pgen. 47 separator: Separator used in the pvar file. If None, the separator is automatically detected. 48 If the automatic detection fails, please specify the separator manually. 49 50 Returns: 51 snpobj: SNPObject containing the data from the pgen fileset. 52 If sum_strands is False, calldata_gt is stored as a numpy array of shape 53 (num_variants, num_samples, 2) and dtype int8 containing 0, 1. 54 If sum_strands is True, calldata_gt is stored as a numpy array of shape 55 (num_variants, num_samples) and dtype int8 containing 0, 1, 2. 56 57 Raises: 58 AssertionError: If both sample_idxs and sample_ids are specified. 59 AssertionError: If both variant_idxs and variant_ids are specified. 60 """ 61 assert ( 62 sample_idxs is None or sample_ids is None 63 ), "Only one of sample_idxs and sample_ids can be specified" 64 assert ( 65 variant_idxs is None or variant_ids is None 66 ), "Only one of variant_idxs and variant_ids can be specified" 67 68 if isinstance(fields, str): 69 fields = [fields] 70 if isinstance(exclude_fields, str): 71 exclude_fields = [exclude_fields] 72 73 fields = fields or ["GT", "IID", "REF", "ALT", "#CHROM", "ID", "POS"] 74 exclude_fields = exclude_fields or [] 75 fields = [field for field in fields if field not in exclude_fields] 76 only_read_bed = fields == ["GT"] and variant_idxs is None and sample_idxs is None 77 78 filename_noext = str(self.filename) 79 if filename_noext[-4:].lower() in (".bed", ".bim", ".fam"): 80 filename_noext = filename_noext[:-4] 81 82 if only_read_bed: 83 with open(filename_noext + '.fam', 'r') as f: 84 file_num_samples = sum(1 for _ in f) # Get sample count from fam file 85 file_num_variants = None # Not needed 86 else: 87 log.info(f"Reading {filename_noext}.bim") 88 89 if separator is None: 90 with open(filename_noext + ".bim", "r") as file: 91 separator = csv.Sniffer().sniff(file.readline()).delimiter 92 93 bim = pl.read_csv( 94 filename_noext + ".bim", 95 separator=separator, 96 has_header=False, 97 new_columns=["#CHROM", "ID", "CM", "POS", "ALT", "REF"], 98 schema_overrides={ 99 "#CHROM": pl.String, 100 "ID": pl.String, 101 "CM": pl.Float64, 102 "POS": pl.Int64, 103 "ALT": pl.String, 104 "REF": pl.String 105 }, 106 null_values=["NA"] 107 ).with_row_index() 108 file_num_variants = bim.height 109 110 if variant_ids is not None: 111 variant_idxs = bim.filter(pl.col("ID").is_in(variant_ids)).select("index").to_series().to_numpy() 112 113 if variant_idxs is None: 114 num_variants = file_num_variants 115 variant_idxs = np.arange(num_variants, dtype=np.uint32) 116 else: 117 num_variants = np.size(variant_idxs) 118 variant_idxs = np.array(variant_idxs, dtype=np.uint32) 119 bim = bim.filter(pl.col("index").is_in(variant_idxs)) 120 121 log.info(f"Reading {filename_noext}.fam") 122 123 fam = pl.read_csv( 124 filename_noext + ".fam", 125 separator=separator, 126 has_header=False, 127 new_columns=["Family ID", "IID", "Father ID", 128 "Mother ID", "Sex code", "Phenotype value"], 129 schema_overrides={ 130 "Family ID": pl.String, 131 "IID": pl.String, 132 "Father ID": pl.String, 133 "Mother ID": pl.String, 134 "Sex code": pl.String, 135 }, 136 null_values=["NA"] 137 ).with_row_index() 138 file_num_samples = fam.height 139 140 if sample_ids is not None: 141 sample_idxs = fam.filter(pl.col("IID").is_in(sample_ids)).select("index").to_series().to_numpy() 142 143 if sample_idxs is None: 144 num_samples = file_num_samples 145 else: 146 num_samples = np.size(sample_idxs) 147 sample_idxs = np.array(sample_idxs, dtype=np.uint32) 148 fam = fam.filter(pl.col("index").is_in(sample_idxs)) 149 150 if "GT" in fields: 151 log.info(f"Reading {filename_noext}.bed") 152 pgen_reader = pg.PgenReader( 153 str.encode(filename_noext + ".bed"), 154 raw_sample_ct=file_num_samples, 155 variant_ct=file_num_variants, 156 sample_subset=sample_idxs, 157 ) 158 159 if only_read_bed: 160 num_samples = pgen_reader.get_raw_sample_ct() 161 num_variants = pgen_reader.get_variant_ct() 162 variant_idxs = np.arange(num_variants, dtype=np.uint32) 163 164 # required arrays: variant_idxs + sample_idxs + genotypes 165 if not sum_strands: 166 required_ram = (num_samples + num_variants + num_variants * 2 * num_samples) * 4 167 else: 168 required_ram = (num_samples + num_variants) * 4 + num_variants * num_samples 169 log.info(f">{required_ram / 1024**3:.2f} GiB of RAM are required to process {num_samples} samples with {num_variants} variants each") 170 171 if not sum_strands: 172 genotypes = np.empty((num_variants, 2 * num_samples), dtype=np.int32) # cannot use int8 because of pgenlib 173 pgen_reader.read_alleles_list(variant_idxs, genotypes) 174 genotypes = genotypes.astype(np.int8).reshape((num_variants, num_samples, 2)) 175 else: 176 genotypes = np.empty((num_variants, num_samples), dtype=np.int8) 177 pgen_reader.read_list(variant_idxs, genotypes) 178 pgen_reader.close() 179 else: 180 genotypes = None 181 182 log.info("Constructing SNPObject") 183 184 snpobj = SNPObject( 185 calldata_gt=genotypes if "GT" in fields else None, 186 samples=fam.get_column("IID").to_numpy() if "IID" in fields and "IID" in fam.columns else None, 187 **{f'variants_{k.lower()}': bim.get_column(v).to_numpy() if v in fields and v in bim.columns else None 188 for k, v in {'ref': 'REF', 'alt': 'ALT', 'chrom': '#CHROM', 'id': 'ID', 'pos': 'POS'}.items()} 189 ) 190 191 log.info("Finished constructing SNPObject") 192 return snpobj
Read a bed fileset (bed, bim, fam) into a SNPObject.
Arguments:
- fields (str, None, or list of str, optional): Fields to extract data for that should be included in the returned SNPObject. Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS'. To extract all fields, set fields to None. Defaults to None.
- exclude_fields (str, None, or list of str, optional): Fields to exclude from the returned SNPObject. Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS'. To exclude no fields, set exclude_fields to None. Defaults to None.
- sample_ids: List of sample IDs to read. If None and sample_idxs is None, all samples are read.
- sample_idxs: List of sample indices to read. If None and sample_ids is None, all samples are read.
- variant_ids: List of variant IDs to read. If None and variant_idxs is None, all variants are read.
- variant_idxs: List of variant indices to read. If None and variant_ids is None, all variants are read.
- sum_strands: True if the maternal and paternal strands are to be summed together, False if the strands are to be stored separately. Note that due to the pgenlib backend, when sum_strands is False, 8 times as much RAM is required. Nonetheless, the calldata_gt will only be double the size. WARNING: bed files do not store phase information. If you need it, use vcf or pgen.
- separator: Separator used in the pvar file. If None, the separator is automatically detected. If the automatic detection fails, please specify the separator manually.
Returns:
snpobj: SNPObject containing the data from the pgen fileset. If sum_strands is False, calldata_gt is stored as a numpy array of shape (num_variants, num_samples, 2) and dtype int8 containing 0, 1. If sum_strands is True, calldata_gt is stored as a numpy array of shape (num_variants, num_samples) and dtype int8 containing 0, 1, 2.
Raises:
- AssertionError: If both sample_idxs and sample_ids are specified.
- AssertionError: If both variant_idxs and variant_ids are specified.
17@SNPBaseReader.register 18class PGENReader(SNPBaseReader): 19 def read( 20 self, 21 fields: Optional[List[str]] = None, 22 exclude_fields: Optional[List[str]] = None, 23 sample_ids: Optional[np.ndarray] = None, 24 sample_idxs: Optional[np.ndarray] = None, 25 variant_ids: Optional[np.ndarray] = None, 26 variant_idxs: Optional[np.ndarray] = None, 27 sum_strands: bool = False, 28 separator: str = None, 29 ) -> SNPObject: 30 """ 31 Read a pgen fileset (pgen, psam, pvar) into a SNPObject. 32 33 Args: 34 fields (str, None, or list of str, optional): Fields to extract data for that should be included in the returned SNPObject. 35 Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS', 'FILTER', 'QUAL'. 36 To extract all fields, set fields to None. Defaults to None. 37 exclude_fields (str, None, or list of str, optional): Fields to exclude from the returned SNPObject. 38 Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS', 'FILTER', 'QUAL'. 39 To exclude no fields, set exclude_fields to None. Defaults to None. 40 sample_ids: List of sample IDs to read. If None and sample_idxs is None, all samples are read. 41 sample_idxs: List of sample indices to read. If None and sample_ids is None, all samples are read. 42 variant_ids: List of variant IDs to read. If None and variant_idxs is None, all variants are read. 43 variant_idxs: List of variant indices to read. If None and variant_ids is None, all variants are read. 44 sum_strands: True if the maternal and paternal strands are to be summed together, 45 False if the strands are to be stored separately. Note that due to the pgenlib backend, when sum_strands is False, 46 8 times as much RAM is required. Nonetheless, the calldata_gt will only be double the size. 47 separator: Separator used in the pvar file. If None, the separator is automatically detected. 48 If the automatic detection fails, please specify the separator manually. 49 50 Returns: 51 snpobj: SNPObject containing the data from the pgen fileset. 52 If sum_strands is False, calldata_gt is stored as a numpy array of shape 53 (num_variants, num_samples, 2) and dtype int8 containing 0, 1. 54 If sum_strands is True, calldata_gt is stored as a numpy array of shape 55 (num_variants, num_samples) and dtype int8 containing 0, 1, 2. 56 57 Raises: 58 AssertionError: If both sample_idxs and sample_ids are specified. 59 AssertionError: If both variant_idxs and variant_ids are specified. 60 """ 61 assert ( 62 sample_idxs is None or sample_ids is None 63 ), "Only one of sample_idxs and sample_ids can be specified" 64 assert ( 65 variant_idxs is None or variant_ids is None 66 ), "Only one of variant_idxs and variant_ids can be specified" 67 68 if isinstance(fields, str): 69 fields = [fields] 70 if isinstance(exclude_fields, str): 71 exclude_fields = [exclude_fields] 72 73 fields = fields or ["GT", "IID", "REF", "ALT", "#CHROM", "ID", "POS", "FILTER", "QUAL"] 74 exclude_fields = exclude_fields or [] 75 fields = [field for field in fields if field not in exclude_fields] 76 only_read_pgen = fields == ["GT"] and variant_idxs is None and sample_idxs is None 77 78 filename_noext = str(self.filename) 79 for ext in [".pgen", ".pvar", ".pvar.zst", ".psam"]: 80 if filename_noext.endswith(ext): 81 filename_noext = filename_noext[:-len(ext)] 82 break 83 84 if only_read_pgen: 85 file_num_samples = None # Not needed for pgen 86 file_num_variants = None # Not needed 87 else: 88 pvar_extensions = [".pvar", ".pvar.zst"] 89 pvar_filename = None 90 for ext in pvar_extensions: 91 possible_pvar = filename_noext + ext 92 if os.path.exists(possible_pvar): 93 pvar_filename = possible_pvar 94 break 95 if pvar_filename is None: 96 raise FileNotFoundError(f"No .pvar or .pvar.zst file found for {filename_noext}") 97 98 log.info(f"Reading {pvar_filename}") 99 100 def open_textfile(filename): 101 if filename.endswith('.zst'): 102 import zstandard as zstd 103 return zstd.open(filename, 'rt') 104 else: 105 return open(filename, 'rt') 106 107 pvar_has_header = True 108 pvar_header_line_num = 0 109 with open_textfile(pvar_filename) as file: 110 for line_num, line in enumerate(file): 111 if line.startswith("##"): # Metadata 112 continue 113 else: 114 if separator is None: 115 separator = csv.Sniffer().sniff(file.readline()).delimiter 116 if line.startswith("#CHROM"): # Header 117 pvar_header_line_num = line_num 118 header = line.strip().split() 119 break 120 elif not line.startswith("#"): # If no header, look at line 1 121 pvar_has_header = False 122 cols_in_pvar = len(line.strip().split(separator)) 123 if cols_in_pvar == 5: 124 header = ["#CHROM", "ID", "POS", "ALT", "REF"] 125 elif cols_in_pvar == 6: 126 header = ["#CHROM", "ID", "CM", "POS", "ALT", "REF"] 127 else: 128 raise ValueError( 129 f"{pvar_filename} is not a valid pvar file." 130 ) 131 break 132 133 pvar_reading_args = { 134 'separator': separator, 135 'skip_rows': pvar_header_line_num, 136 'has_header': pvar_has_header, 137 'new_columns': None if pvar_has_header else header, 138 'schema_overrides': { 139 "#CHROM": pl.String, 140 "POS": pl.UInt32, 141 "ID": pl.String, 142 "REF": pl.String, 143 "ALT": pl.String, 144 }, 145 'null_values': ["NA"], 146 } 147 if pvar_filename.endswith('.zst'): 148 pvar = pl.read_csv(pvar_filename, **pvar_reading_args).lazy() 149 else: 150 pvar = pl.scan_csv(pvar_filename, **pvar_reading_args) 151 152 # We need to track row positions of variants but doing this with lazy operations would 153 # defeat predicate pushdown optimization and force loading the entire DataFrame. 154 # Solution: We only materialize the ID column with indices, keeping memory usage minimal 155 # while maintaining the ability to map between variant IDs and their file positions. 156 variant_id_idxs = pvar.select("ID").with_row_index().collect() 157 158 file_num_variants = variant_id_idxs.height 159 160 if variant_ids is not None: 161 num_variants = np.size(variant_ids) 162 pvar = pvar.filter(pl.col("ID").is_in(variant_ids)).collect() 163 variant_idxs = variant_id_idxs.filter(pl.col("ID").is_in(variant_ids)).select("index").to_series().to_numpy() 164 elif variant_idxs is not None: 165 num_variants = np.size(variant_idxs) 166 variant_idxs = np.array(variant_idxs, dtype=np.uint32) 167 variant_ids = variant_id_idxs.filter(pl.col("index").is_in(variant_idxs)).select("ID") 168 pvar = pvar.filter(pl.col("ID").is_in(variant_ids)).collect() 169 else: 170 num_variants = file_num_variants 171 variant_idxs = np.arange(num_variants, dtype=np.uint32) 172 pvar = pvar.collect() 173 174 log.info(f"Reading {filename_noext}.psam") 175 176 with open(filename_noext + ".psam") as file: 177 first_line = file.readline().strip() 178 psam_has_header = first_line.startswith(("#FID", "FID", "#IID", "IID")) 179 180 psam = pl.read_csv( 181 filename_noext + ".psam", 182 separator=separator, 183 has_header=psam_has_header, 184 new_columns=None if psam_has_header else ["FID", "IID", "PAT", "MAT", "SEX", "PHENO1"], 185 null_values=["NA"], 186 ).with_row_index() 187 if "#IID" in psam.columns: 188 psam = psam.rename({"#IID": "IID"}) 189 if "#FID" in psam.columns: 190 psam = psam.rename({"#FID": "FID"}) 191 192 file_num_samples = psam.height 193 194 if sample_ids is not None: 195 num_samples = np.size(sample_ids) 196 psam = psam.filter(pl.col("IID").is_in(sample_ids)) 197 sample_idxs = psam.select("index").to_series().to_numpy() 198 elif sample_idxs is not None: 199 num_samples = np.size(sample_idxs) 200 sample_idxs = np.array(sample_idxs, dtype=np.uint32) 201 psam = psam.filter(pl.col("index").is_in(sample_idxs)) 202 else: 203 num_samples = file_num_samples 204 205 if "GT" in fields: 206 log.info(f"Reading {filename_noext}.pgen") 207 pgen_reader = pg.PgenReader( 208 str.encode(filename_noext + ".pgen"), 209 raw_sample_ct=file_num_samples, 210 variant_ct=file_num_variants, 211 sample_subset=sample_idxs, 212 ) 213 214 if only_read_pgen: 215 num_samples = pgen_reader.get_raw_sample_ct() 216 num_variants = pgen_reader.get_variant_ct() 217 variant_idxs = np.arange(num_variants, dtype=np.uint32) 218 219 # required arrays: variant_idxs + sample_idxs + genotypes 220 if not sum_strands: 221 required_ram = (num_samples + num_variants + num_variants * 2 * num_samples) * 4 222 else: 223 required_ram = (num_samples + num_variants) * 4 + num_variants * num_samples 224 log.info(f">{required_ram / 1024**3:.2f} GiB of RAM are required to process {num_samples} samples with {num_variants} variants each") 225 226 if not sum_strands: 227 genotypes = np.empty((num_variants, 2 * num_samples), dtype=np.int32) # cannot use int8 because of pgenlib 228 pgen_reader.read_alleles_list(variant_idxs, genotypes) 229 genotypes = genotypes.astype(np.int8).reshape((num_variants, num_samples, 2)) 230 else: 231 genotypes = np.empty((num_variants, num_samples), dtype=np.int8) 232 pgen_reader.read_list(variant_idxs, genotypes) 233 pgen_reader.close() 234 else: 235 genotypes = None 236 237 log.info("Constructing SNPObject") 238 239 snpobj = SNPObject( 240 calldata_gt=genotypes if "GT" in fields else None, 241 samples=psam.get_column("IID").to_numpy() if "IID" in fields and "IID" in psam.columns else None, 242 **{f'variants_{k.lower()}': pvar.get_column(v).to_numpy() if v in fields and v in pvar.columns else None 243 for k, v in {'ref': 'REF', 'alt': 'ALT', 'chrom': '#CHROM', 'id': 'ID', 'pos': 'POS', 'filter_pass': 'FILTER', 'qual': 'QUAL'}.items()} 244 ) 245 246 log.info("Finished constructing SNPObject") 247 return snpobj
Abstract class for SNP readers.
Attributes:
- _filename: The path to the file storing SNP data.
19 def read( 20 self, 21 fields: Optional[List[str]] = None, 22 exclude_fields: Optional[List[str]] = None, 23 sample_ids: Optional[np.ndarray] = None, 24 sample_idxs: Optional[np.ndarray] = None, 25 variant_ids: Optional[np.ndarray] = None, 26 variant_idxs: Optional[np.ndarray] = None, 27 sum_strands: bool = False, 28 separator: str = None, 29 ) -> SNPObject: 30 """ 31 Read a pgen fileset (pgen, psam, pvar) into a SNPObject. 32 33 Args: 34 fields (str, None, or list of str, optional): Fields to extract data for that should be included in the returned SNPObject. 35 Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS', 'FILTER', 'QUAL'. 36 To extract all fields, set fields to None. Defaults to None. 37 exclude_fields (str, None, or list of str, optional): Fields to exclude from the returned SNPObject. 38 Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS', 'FILTER', 'QUAL'. 39 To exclude no fields, set exclude_fields to None. Defaults to None. 40 sample_ids: List of sample IDs to read. If None and sample_idxs is None, all samples are read. 41 sample_idxs: List of sample indices to read. If None and sample_ids is None, all samples are read. 42 variant_ids: List of variant IDs to read. If None and variant_idxs is None, all variants are read. 43 variant_idxs: List of variant indices to read. If None and variant_ids is None, all variants are read. 44 sum_strands: True if the maternal and paternal strands are to be summed together, 45 False if the strands are to be stored separately. Note that due to the pgenlib backend, when sum_strands is False, 46 8 times as much RAM is required. Nonetheless, the calldata_gt will only be double the size. 47 separator: Separator used in the pvar file. If None, the separator is automatically detected. 48 If the automatic detection fails, please specify the separator manually. 49 50 Returns: 51 snpobj: SNPObject containing the data from the pgen fileset. 52 If sum_strands is False, calldata_gt is stored as a numpy array of shape 53 (num_variants, num_samples, 2) and dtype int8 containing 0, 1. 54 If sum_strands is True, calldata_gt is stored as a numpy array of shape 55 (num_variants, num_samples) and dtype int8 containing 0, 1, 2. 56 57 Raises: 58 AssertionError: If both sample_idxs and sample_ids are specified. 59 AssertionError: If both variant_idxs and variant_ids are specified. 60 """ 61 assert ( 62 sample_idxs is None or sample_ids is None 63 ), "Only one of sample_idxs and sample_ids can be specified" 64 assert ( 65 variant_idxs is None or variant_ids is None 66 ), "Only one of variant_idxs and variant_ids can be specified" 67 68 if isinstance(fields, str): 69 fields = [fields] 70 if isinstance(exclude_fields, str): 71 exclude_fields = [exclude_fields] 72 73 fields = fields or ["GT", "IID", "REF", "ALT", "#CHROM", "ID", "POS", "FILTER", "QUAL"] 74 exclude_fields = exclude_fields or [] 75 fields = [field for field in fields if field not in exclude_fields] 76 only_read_pgen = fields == ["GT"] and variant_idxs is None and sample_idxs is None 77 78 filename_noext = str(self.filename) 79 for ext in [".pgen", ".pvar", ".pvar.zst", ".psam"]: 80 if filename_noext.endswith(ext): 81 filename_noext = filename_noext[:-len(ext)] 82 break 83 84 if only_read_pgen: 85 file_num_samples = None # Not needed for pgen 86 file_num_variants = None # Not needed 87 else: 88 pvar_extensions = [".pvar", ".pvar.zst"] 89 pvar_filename = None 90 for ext in pvar_extensions: 91 possible_pvar = filename_noext + ext 92 if os.path.exists(possible_pvar): 93 pvar_filename = possible_pvar 94 break 95 if pvar_filename is None: 96 raise FileNotFoundError(f"No .pvar or .pvar.zst file found for {filename_noext}") 97 98 log.info(f"Reading {pvar_filename}") 99 100 def open_textfile(filename): 101 if filename.endswith('.zst'): 102 import zstandard as zstd 103 return zstd.open(filename, 'rt') 104 else: 105 return open(filename, 'rt') 106 107 pvar_has_header = True 108 pvar_header_line_num = 0 109 with open_textfile(pvar_filename) as file: 110 for line_num, line in enumerate(file): 111 if line.startswith("##"): # Metadata 112 continue 113 else: 114 if separator is None: 115 separator = csv.Sniffer().sniff(file.readline()).delimiter 116 if line.startswith("#CHROM"): # Header 117 pvar_header_line_num = line_num 118 header = line.strip().split() 119 break 120 elif not line.startswith("#"): # If no header, look at line 1 121 pvar_has_header = False 122 cols_in_pvar = len(line.strip().split(separator)) 123 if cols_in_pvar == 5: 124 header = ["#CHROM", "ID", "POS", "ALT", "REF"] 125 elif cols_in_pvar == 6: 126 header = ["#CHROM", "ID", "CM", "POS", "ALT", "REF"] 127 else: 128 raise ValueError( 129 f"{pvar_filename} is not a valid pvar file." 130 ) 131 break 132 133 pvar_reading_args = { 134 'separator': separator, 135 'skip_rows': pvar_header_line_num, 136 'has_header': pvar_has_header, 137 'new_columns': None if pvar_has_header else header, 138 'schema_overrides': { 139 "#CHROM": pl.String, 140 "POS": pl.UInt32, 141 "ID": pl.String, 142 "REF": pl.String, 143 "ALT": pl.String, 144 }, 145 'null_values': ["NA"], 146 } 147 if pvar_filename.endswith('.zst'): 148 pvar = pl.read_csv(pvar_filename, **pvar_reading_args).lazy() 149 else: 150 pvar = pl.scan_csv(pvar_filename, **pvar_reading_args) 151 152 # We need to track row positions of variants but doing this with lazy operations would 153 # defeat predicate pushdown optimization and force loading the entire DataFrame. 154 # Solution: We only materialize the ID column with indices, keeping memory usage minimal 155 # while maintaining the ability to map between variant IDs and their file positions. 156 variant_id_idxs = pvar.select("ID").with_row_index().collect() 157 158 file_num_variants = variant_id_idxs.height 159 160 if variant_ids is not None: 161 num_variants = np.size(variant_ids) 162 pvar = pvar.filter(pl.col("ID").is_in(variant_ids)).collect() 163 variant_idxs = variant_id_idxs.filter(pl.col("ID").is_in(variant_ids)).select("index").to_series().to_numpy() 164 elif variant_idxs is not None: 165 num_variants = np.size(variant_idxs) 166 variant_idxs = np.array(variant_idxs, dtype=np.uint32) 167 variant_ids = variant_id_idxs.filter(pl.col("index").is_in(variant_idxs)).select("ID") 168 pvar = pvar.filter(pl.col("ID").is_in(variant_ids)).collect() 169 else: 170 num_variants = file_num_variants 171 variant_idxs = np.arange(num_variants, dtype=np.uint32) 172 pvar = pvar.collect() 173 174 log.info(f"Reading {filename_noext}.psam") 175 176 with open(filename_noext + ".psam") as file: 177 first_line = file.readline().strip() 178 psam_has_header = first_line.startswith(("#FID", "FID", "#IID", "IID")) 179 180 psam = pl.read_csv( 181 filename_noext + ".psam", 182 separator=separator, 183 has_header=psam_has_header, 184 new_columns=None if psam_has_header else ["FID", "IID", "PAT", "MAT", "SEX", "PHENO1"], 185 null_values=["NA"], 186 ).with_row_index() 187 if "#IID" in psam.columns: 188 psam = psam.rename({"#IID": "IID"}) 189 if "#FID" in psam.columns: 190 psam = psam.rename({"#FID": "FID"}) 191 192 file_num_samples = psam.height 193 194 if sample_ids is not None: 195 num_samples = np.size(sample_ids) 196 psam = psam.filter(pl.col("IID").is_in(sample_ids)) 197 sample_idxs = psam.select("index").to_series().to_numpy() 198 elif sample_idxs is not None: 199 num_samples = np.size(sample_idxs) 200 sample_idxs = np.array(sample_idxs, dtype=np.uint32) 201 psam = psam.filter(pl.col("index").is_in(sample_idxs)) 202 else: 203 num_samples = file_num_samples 204 205 if "GT" in fields: 206 log.info(f"Reading {filename_noext}.pgen") 207 pgen_reader = pg.PgenReader( 208 str.encode(filename_noext + ".pgen"), 209 raw_sample_ct=file_num_samples, 210 variant_ct=file_num_variants, 211 sample_subset=sample_idxs, 212 ) 213 214 if only_read_pgen: 215 num_samples = pgen_reader.get_raw_sample_ct() 216 num_variants = pgen_reader.get_variant_ct() 217 variant_idxs = np.arange(num_variants, dtype=np.uint32) 218 219 # required arrays: variant_idxs + sample_idxs + genotypes 220 if not sum_strands: 221 required_ram = (num_samples + num_variants + num_variants * 2 * num_samples) * 4 222 else: 223 required_ram = (num_samples + num_variants) * 4 + num_variants * num_samples 224 log.info(f">{required_ram / 1024**3:.2f} GiB of RAM are required to process {num_samples} samples with {num_variants} variants each") 225 226 if not sum_strands: 227 genotypes = np.empty((num_variants, 2 * num_samples), dtype=np.int32) # cannot use int8 because of pgenlib 228 pgen_reader.read_alleles_list(variant_idxs, genotypes) 229 genotypes = genotypes.astype(np.int8).reshape((num_variants, num_samples, 2)) 230 else: 231 genotypes = np.empty((num_variants, num_samples), dtype=np.int8) 232 pgen_reader.read_list(variant_idxs, genotypes) 233 pgen_reader.close() 234 else: 235 genotypes = None 236 237 log.info("Constructing SNPObject") 238 239 snpobj = SNPObject( 240 calldata_gt=genotypes if "GT" in fields else None, 241 samples=psam.get_column("IID").to_numpy() if "IID" in fields and "IID" in psam.columns else None, 242 **{f'variants_{k.lower()}': pvar.get_column(v).to_numpy() if v in fields and v in pvar.columns else None 243 for k, v in {'ref': 'REF', 'alt': 'ALT', 'chrom': '#CHROM', 'id': 'ID', 'pos': 'POS', 'filter_pass': 'FILTER', 'qual': 'QUAL'}.items()} 244 ) 245 246 log.info("Finished constructing SNPObject") 247 return snpobj
Read a pgen fileset (pgen, psam, pvar) into a SNPObject.
Arguments:
- fields (str, None, or list of str, optional): Fields to extract data for that should be included in the returned SNPObject. Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS', 'FILTER', 'QUAL'. To extract all fields, set fields to None. Defaults to None.
- exclude_fields (str, None, or list of str, optional): Fields to exclude from the returned SNPObject. Available fields are 'GT', 'IID', 'REF', 'ALT', '#CHROM', 'ID', 'POS', 'FILTER', 'QUAL'. To exclude no fields, set exclude_fields to None. Defaults to None.
- sample_ids: List of sample IDs to read. If None and sample_idxs is None, all samples are read.
- sample_idxs: List of sample indices to read. If None and sample_ids is None, all samples are read.
- variant_ids: List of variant IDs to read. If None and variant_idxs is None, all variants are read.
- variant_idxs: List of variant indices to read. If None and variant_ids is None, all variants are read.
- sum_strands: True if the maternal and paternal strands are to be summed together, False if the strands are to be stored separately. Note that due to the pgenlib backend, when sum_strands is False, 8 times as much RAM is required. Nonetheless, the calldata_gt will only be double the size.
- separator: Separator used in the pvar file. If None, the separator is automatically detected. If the automatic detection fails, please specify the separator manually.
Returns:
snpobj: SNPObject containing the data from the pgen fileset. If sum_strands is False, calldata_gt is stored as a numpy array of shape (num_variants, num_samples, 2) and dtype int8 containing 0, 1. If sum_strands is True, calldata_gt is stored as a numpy array of shape (num_variants, num_samples) and dtype int8 containing 0, 1, 2.
Raises:
- AssertionError: If both sample_idxs and sample_ids are specified.
- AssertionError: If both variant_idxs and variant_ids are specified.
18@SNPBaseReader.register 19class VCFReader(SNPBaseReader): 20 def read( 21 self, 22 fields: Optional[List[str]] = None, 23 exclude_fields: Optional[List[str]] = None, 24 rename_fields: Optional[dict] = None, 25 fills: Optional[dict] = None, 26 region: Optional[str] = None, 27 samples: Optional[List[str]] = None, 28 sum_strands: bool = False, 29 ) -> SNPObject: 30 """ 31 Read a vcf file into a SNPObject. 32 33 Args: 34 fields: Fields to extract data for. e.g., ['variants/CHROM', 'variants/POS', 35 'calldata/GT']. If you are feeling lazy, you can drop the 'variants/' 36 and 'calldata/' prefixes, in which case the fields will be matched 37 against fields declared in the VCF header, with variants taking priority 38 over calldata if a field with the same ID exists both in INFO and FORMAT 39 headers. I.e., ['CHROM', 'POS', 'DP', 'GT'] will work, although watch out 40 for fields like 'DP' which can be both INFO and FORMAT. To extract all 41 fields, provide just the string '*'. To extract all variants fields 42 (including all INFO fields) provide 'variants/*'. To extract all 43 calldata fields (i.e., defined in FORMAT headers) provide 'calldata/*'. 44 exclude_fields: Fields to exclude. E.g., for use in combination with fields='*'. 45 rename_fields: Fields to be renamed. Should be a dictionary mapping old to new names. 46 fills: Override the fill value used for empty values. Should be a dictionary 47 mapping field names to fill values. 48 region: Genomic region to extract variants for. If provided, should be a 49 tabix-style region string, which can be either just a chromosome name 50 (e.g., '2L'), or a chromosome name followed by 1-based beginning and 51 end coordinates (e.g., '2L:100000-200000'). Note that only variants 52 whose start position (POS) is within the requested range will be included. 53 This is slightly different from the default tabix behaviour, where a 54 variant (e.g., deletion) may be included if its position (POS) occurs 55 before the requested region but its reference allele overlaps the 56 region - such a variant will not be included in the data returned 57 by this function. 58 samples: Selection of samples to extract calldata for. If provided, should be 59 a list of strings giving sample identifiers. May also be a list of 60 integers giving indices of selected samples. 61 sum_strands: True if the maternal and paternal strands are to be summed together, 62 False if the strands are to be stored separately. 63 64 Returns: 65 snpobj: SNPObject containing the data from the vcf file. 66 If sum_strands is False, calldata_gt is stored as a numpy array of shape 67 (num_variants, num_samples, 2) and dtype int8 containing 0, 1. 68 If sum_strands is True, calldata_gt is stored as a numpy array of shape 69 (num_variants, num_samples) and dtype int8 containing 0, 1, 2. 70 """ 71 log.info(f"Reading {self.filename}") 72 73 vcf_dict = allel.read_vcf( 74 str(self.filename), 75 fields=fields, 76 exclude_fields=exclude_fields, 77 rename_fields=rename_fields, 78 fills=fills, 79 region=region, 80 samples=samples, 81 alt_number=1, 82 ) 83 assert vcf_dict is not None # suppress Flake8 warning 84 85 genotypes = vcf_dict["calldata/GT"].astype(np.int8) 86 if sum_strands: 87 genotypes = genotypes.sum(axis=2, dtype=np.int8) 88 89 snpobj = SNPObject( 90 calldata_gt=genotypes, 91 samples=vcf_dict["samples"], 92 variants_ref=vcf_dict["variants/REF"], 93 variants_alt=vcf_dict["variants/ALT"], 94 variants_chrom=vcf_dict["variants/CHROM"], 95 variants_filter_pass=vcf_dict["variants/FILTER_PASS"], 96 variants_id=vcf_dict["variants/ID"], 97 variants_pos=vcf_dict["variants/POS"], 98 variants_qual=vcf_dict["variants/QUAL"], 99 ) 100 101 log.info(f"Finished reading {self.filename}") 102 return snpobj
Abstract class for SNP readers.
Attributes:
- _filename: The path to the file storing SNP data.
20 def read( 21 self, 22 fields: Optional[List[str]] = None, 23 exclude_fields: Optional[List[str]] = None, 24 rename_fields: Optional[dict] = None, 25 fills: Optional[dict] = None, 26 region: Optional[str] = None, 27 samples: Optional[List[str]] = None, 28 sum_strands: bool = False, 29 ) -> SNPObject: 30 """ 31 Read a vcf file into a SNPObject. 32 33 Args: 34 fields: Fields to extract data for. e.g., ['variants/CHROM', 'variants/POS', 35 'calldata/GT']. If you are feeling lazy, you can drop the 'variants/' 36 and 'calldata/' prefixes, in which case the fields will be matched 37 against fields declared in the VCF header, with variants taking priority 38 over calldata if a field with the same ID exists both in INFO and FORMAT 39 headers. I.e., ['CHROM', 'POS', 'DP', 'GT'] will work, although watch out 40 for fields like 'DP' which can be both INFO and FORMAT. To extract all 41 fields, provide just the string '*'. To extract all variants fields 42 (including all INFO fields) provide 'variants/*'. To extract all 43 calldata fields (i.e., defined in FORMAT headers) provide 'calldata/*'. 44 exclude_fields: Fields to exclude. E.g., for use in combination with fields='*'. 45 rename_fields: Fields to be renamed. Should be a dictionary mapping old to new names. 46 fills: Override the fill value used for empty values. Should be a dictionary 47 mapping field names to fill values. 48 region: Genomic region to extract variants for. If provided, should be a 49 tabix-style region string, which can be either just a chromosome name 50 (e.g., '2L'), or a chromosome name followed by 1-based beginning and 51 end coordinates (e.g., '2L:100000-200000'). Note that only variants 52 whose start position (POS) is within the requested range will be included. 53 This is slightly different from the default tabix behaviour, where a 54 variant (e.g., deletion) may be included if its position (POS) occurs 55 before the requested region but its reference allele overlaps the 56 region - such a variant will not be included in the data returned 57 by this function. 58 samples: Selection of samples to extract calldata for. If provided, should be 59 a list of strings giving sample identifiers. May also be a list of 60 integers giving indices of selected samples. 61 sum_strands: True if the maternal and paternal strands are to be summed together, 62 False if the strands are to be stored separately. 63 64 Returns: 65 snpobj: SNPObject containing the data from the vcf file. 66 If sum_strands is False, calldata_gt is stored as a numpy array of shape 67 (num_variants, num_samples, 2) and dtype int8 containing 0, 1. 68 If sum_strands is True, calldata_gt is stored as a numpy array of shape 69 (num_variants, num_samples) and dtype int8 containing 0, 1, 2. 70 """ 71 log.info(f"Reading {self.filename}") 72 73 vcf_dict = allel.read_vcf( 74 str(self.filename), 75 fields=fields, 76 exclude_fields=exclude_fields, 77 rename_fields=rename_fields, 78 fills=fills, 79 region=region, 80 samples=samples, 81 alt_number=1, 82 ) 83 assert vcf_dict is not None # suppress Flake8 warning 84 85 genotypes = vcf_dict["calldata/GT"].astype(np.int8) 86 if sum_strands: 87 genotypes = genotypes.sum(axis=2, dtype=np.int8) 88 89 snpobj = SNPObject( 90 calldata_gt=genotypes, 91 samples=vcf_dict["samples"], 92 variants_ref=vcf_dict["variants/REF"], 93 variants_alt=vcf_dict["variants/ALT"], 94 variants_chrom=vcf_dict["variants/CHROM"], 95 variants_filter_pass=vcf_dict["variants/FILTER_PASS"], 96 variants_id=vcf_dict["variants/ID"], 97 variants_pos=vcf_dict["variants/POS"], 98 variants_qual=vcf_dict["variants/QUAL"], 99 ) 100 101 log.info(f"Finished reading {self.filename}") 102 return snpobj
Read a vcf file into a SNPObject.
Arguments:
- fields: Fields to extract data for. e.g., ['variants/CHROM', 'variants/POS', 'calldata/GT']. If you are feeling lazy, you can drop the 'variants/' and 'calldata/' prefixes, in which case the fields will be matched against fields declared in the VCF header, with variants taking priority over calldata if a field with the same ID exists both in INFO and FORMAT headers. I.e., ['CHROM', 'POS', 'DP', 'GT'] will work, although watch out for fields like 'DP' which can be both INFO and FORMAT. To extract all fields, provide just the string ''. To extract all variants fields (including all INFO fields) provide 'variants/'. To extract all calldata fields (i.e., defined in FORMAT headers) provide 'calldata/*'.
- exclude_fields: Fields to exclude. E.g., for use in combination with fields='*'.
- rename_fields: Fields to be renamed. Should be a dictionary mapping old to new names.
- fills: Override the fill value used for empty values. Should be a dictionary mapping field names to fill values.
- region: Genomic region to extract variants for. If provided, should be a tabix-style region string, which can be either just a chromosome name (e.g., '2L'), or a chromosome name followed by 1-based beginning and end coordinates (e.g., '2L:100000-200000'). Note that only variants whose start position (POS) is within the requested range will be included. This is slightly different from the default tabix behaviour, where a variant (e.g., deletion) may be included if its position (POS) occurs before the requested region but its reference allele overlaps the region - such a variant will not be included in the data returned by this function.
- samples: Selection of samples to extract calldata for. If provided, should be a list of strings giving sample identifiers. May also be a list of integers giving indices of selected samples.
- sum_strands: True if the maternal and paternal strands are to be summed together,
- False if the strands are to be stored separately.
Returns:
snpobj: SNPObject containing the data from the vcf file. If sum_strands is False, calldata_gt is stored as a numpy array of shape (num_variants, num_samples, 2) and dtype int8 containing 0, 1. If sum_strands is True, calldata_gt is stored as a numpy array of shape (num_variants, num_samples) and dtype int8 containing 0, 1, 2.
14class BEDWriter: 15 """Writes an object in bed/bim/fam formats in the specified output path. 16 17 Args: 18 snpobj: The SNPObject to be written. 19 file: The output file path. 20 21 """ 22 23 def __init__(self, snpobj: SNPObject, filename: str): 24 self.__snpobj = snpobj.copy() 25 self.__filename = Path(filename) 26 27 def write( 28 self, 29 rename_missing_values: bool = True, 30 before: Union[int, float, str] = -1, 31 after: Union[int, float, str] = '.' 32 ): 33 """ 34 Writes the SNPObject to bed/bim/fam formats. 35 36 Args: 37 rename_missing_values (bool, optional): 38 If True, renames potential missing values in `snpobj.calldata_gt` before writing. 39 Defaults to True. 40 before (int, float, or str, default=-1): 41 The current representation of missing values in `calldata_gt`. Common values might be -1, '.', or NaN. 42 Default is -1. 43 after (int, float, or str, default='.'): 44 The value that will replace `before`. Default is '.'. 45 """ 46 # Save .bed file 47 if self.__filename.suffix != '.bed': 48 self.__filename = self.__filename.with_suffix('.bed') 49 50 log.info(f"Writing .bed file: {self.__filename}") 51 52 # Optionally rename potential missing values in `snpobj.calldata_gt` before writing 53 if rename_missing_values: 54 self.__snpobj.rename_missings(before=before, after=after, inplace=True) 55 56 # If the input matrix has three dimensions, it indicates that the data is divided into two strands. 57 if len(self.__snpobj.calldata_gt.shape) == 3: 58 # Sum the two strands 59 self.__snpobj.calldata_gt = self.__snpobj.calldata_gt.transpose(1, 0, 2).sum(axis=2, dtype=np.int8) 60 61 # Infer the number of samples and variants from the matrix 62 samples, variants = self.__snpobj.calldata_gt.shape 63 64 # Define the PgenWriter to save the data 65 data_save = pg.PgenWriter(filename=str(self.__filename).encode('utf-8'), 66 sample_ct=samples, 67 variant_ct=variants, 68 nonref_flags=True, 69 hardcall_phase_present=False, 70 dosage_present=True, 71 dosage_phase_present=False) 72 73 # Fill the data_save object with the matrix of individuals x variants 74 for snp_i in range(0, variants): 75 data_save.append_biallelic(np.ascontiguousarray(self.__snpobj.calldata_gt[:, snp_i])) 76 77 # Save the .bed file 78 data_save.close() 79 80 log.info(f"Finished writing .bed file: {self.__filename}") 81 82 # Remove .bed from the file name 83 if self.__filename.suffix == '.bed': 84 self.__filename = self.__filename.with_suffix('') 85 86 # Save .fam file 87 log.info(f"Writing .fam file: {self.__filename}") 88 89 # Fill .fam file 90 fam_file = pd.DataFrame(columns=['fid', 'iid', 'father', 'mother', 'gender', 'trait']) 91 fam_file['iid'] = self.__snpobj.samples 92 fam_file['fid'] = self.__snpobj.samples 93 94 # Save .fam file 95 fam_file.to_csv(self.__filename.with_suffix('.fam'), sep='\t', index=False, header=False) 96 log.info(f"Finished writing .fam file: {self.__filename}") 97 98 # Save .bim file 99 log.info(f"Writing .bim file: {self.__filename}") 100 101 # Fill .bim file 102 bim_file = pd.DataFrame(columns=['chrom', 'snp', 'cm', 'pos', 'a0', 'a1']) 103 bim_file['chrom'] = self.__snpobj.variants_chrom 104 bim_file['snp'] = self.__snpobj.variants_id 105 bim_file['cm'] = 0 # TODO: read, save and write too if available? 106 log.warning("The .bim file is being saved with 0 cM values.") 107 bim_file['pos'] = self.__snpobj.variants_pos 108 bim_file['a0'] = self.__snpobj.variants_alt 109 bim_file['a1'] = self.__snpobj.variants_ref 110 111 # Save .bim file 112 bim_file.to_csv(self.__filename.with_suffix('.bim'), sep='\t', index=False, header=False) 113 log.info(f"Finished writing .bim file: {self.__filename}")
Writes an object in bed/bim/fam formats in the specified output path.
Arguments:
- snpobj: The SNPObject to be written.
- file: The output file path.
27 def write( 28 self, 29 rename_missing_values: bool = True, 30 before: Union[int, float, str] = -1, 31 after: Union[int, float, str] = '.' 32 ): 33 """ 34 Writes the SNPObject to bed/bim/fam formats. 35 36 Args: 37 rename_missing_values (bool, optional): 38 If True, renames potential missing values in `snpobj.calldata_gt` before writing. 39 Defaults to True. 40 before (int, float, or str, default=-1): 41 The current representation of missing values in `calldata_gt`. Common values might be -1, '.', or NaN. 42 Default is -1. 43 after (int, float, or str, default='.'): 44 The value that will replace `before`. Default is '.'. 45 """ 46 # Save .bed file 47 if self.__filename.suffix != '.bed': 48 self.__filename = self.__filename.with_suffix('.bed') 49 50 log.info(f"Writing .bed file: {self.__filename}") 51 52 # Optionally rename potential missing values in `snpobj.calldata_gt` before writing 53 if rename_missing_values: 54 self.__snpobj.rename_missings(before=before, after=after, inplace=True) 55 56 # If the input matrix has three dimensions, it indicates that the data is divided into two strands. 57 if len(self.__snpobj.calldata_gt.shape) == 3: 58 # Sum the two strands 59 self.__snpobj.calldata_gt = self.__snpobj.calldata_gt.transpose(1, 0, 2).sum(axis=2, dtype=np.int8) 60 61 # Infer the number of samples and variants from the matrix 62 samples, variants = self.__snpobj.calldata_gt.shape 63 64 # Define the PgenWriter to save the data 65 data_save = pg.PgenWriter(filename=str(self.__filename).encode('utf-8'), 66 sample_ct=samples, 67 variant_ct=variants, 68 nonref_flags=True, 69 hardcall_phase_present=False, 70 dosage_present=True, 71 dosage_phase_present=False) 72 73 # Fill the data_save object with the matrix of individuals x variants 74 for snp_i in range(0, variants): 75 data_save.append_biallelic(np.ascontiguousarray(self.__snpobj.calldata_gt[:, snp_i])) 76 77 # Save the .bed file 78 data_save.close() 79 80 log.info(f"Finished writing .bed file: {self.__filename}") 81 82 # Remove .bed from the file name 83 if self.__filename.suffix == '.bed': 84 self.__filename = self.__filename.with_suffix('') 85 86 # Save .fam file 87 log.info(f"Writing .fam file: {self.__filename}") 88 89 # Fill .fam file 90 fam_file = pd.DataFrame(columns=['fid', 'iid', 'father', 'mother', 'gender', 'trait']) 91 fam_file['iid'] = self.__snpobj.samples 92 fam_file['fid'] = self.__snpobj.samples 93 94 # Save .fam file 95 fam_file.to_csv(self.__filename.with_suffix('.fam'), sep='\t', index=False, header=False) 96 log.info(f"Finished writing .fam file: {self.__filename}") 97 98 # Save .bim file 99 log.info(f"Writing .bim file: {self.__filename}") 100 101 # Fill .bim file 102 bim_file = pd.DataFrame(columns=['chrom', 'snp', 'cm', 'pos', 'a0', 'a1']) 103 bim_file['chrom'] = self.__snpobj.variants_chrom 104 bim_file['snp'] = self.__snpobj.variants_id 105 bim_file['cm'] = 0 # TODO: read, save and write too if available? 106 log.warning("The .bim file is being saved with 0 cM values.") 107 bim_file['pos'] = self.__snpobj.variants_pos 108 bim_file['a0'] = self.__snpobj.variants_alt 109 bim_file['a1'] = self.__snpobj.variants_ref 110 111 # Save .bim file 112 bim_file.to_csv(self.__filename.with_suffix('.bim'), sep='\t', index=False, header=False) 113 log.info(f"Finished writing .bim file: {self.__filename}")
Writes the SNPObject to bed/bim/fam formats.
Arguments:
- rename_missing_values (bool, optional): If True, renames potential missing values in
snpobj.calldata_gt
before writing. Defaults to True. - before (int, float, or str, default=-1): The current representation of missing values in
calldata_gt
. Common values might be -1, '.', or NaN. Default is -1. - after (int, float, or str, default='.'): The value that will replace
before
. Default is '.'.
15class PGENWriter: 16 """ 17 Writes a genotype object in PGEN format (.pgen, .psam, and .pvar files) in the specified output path. 18 """ 19 20 def __init__(self, snpobj: SNPObject, filename: str): 21 """ 22 Initializes the PGENWriter instance. 23 24 Args: 25 snpobj (SNPObject): The SNPObject containing genotype data to be written. 26 filename (str): Base path for the output files (excluding extension). 27 """ 28 self.__snpobj = snpobj 29 self.__filename = Path(filename) 30 31 def write( 32 self, 33 vzs: bool = False, 34 rename_missing_values: bool = True, 35 before: Union[int, float, str] = -1, 36 after: Union[int, float, str] = '.' 37 ): 38 """ 39 Writes the SNPObject data to .pgen, .psam, and .pvar files. 40 41 Args: 42 vzs (bool, optional): 43 If True, compresses the .pvar file using zstd and saves it as .pvar.zst. Defaults to False. 44 rename_missing_values (bool, optional): 45 If True, renames potential missing values in `snpobj.calldata_gt` before writing. 46 Defaults to True. 47 before (int, float, or str, default=-1): 48 The current representation of missing values in `calldata_gt`. Common values might be -1, '.', or NaN. 49 Default is -1. 50 after (int, float, or str, default='.'): 51 The value that will replace `before`. Default is '.'. 52 """ 53 file_extensions = (".pgen", ".psam", ".pvar", ".pvar.zst") 54 if self.__filename.suffix in file_extensions: 55 self.__filename = self.__filename.with_suffix('') 56 57 # Optionally rename potential missing values in `snpobj.calldata_gt` before writing 58 if rename_missing_values: 59 self.__snpobj.rename_missings(before=before, after=after, inplace=True) 60 61 self.write_pvar(vzs=vzs) 62 self.write_psam() 63 self.write_pgen() 64 65 def write_pvar(self, vzs: bool = False): 66 """ 67 Writes variant data to the .pvar file. 68 69 Args: 70 vzs (bool, optional): If True, compresses the .pvar file using zstd and saves it as .pvar.zst. Defaults to False. 71 """ 72 output_filename = f"{self.__filename}.pvar" 73 if vzs: 74 output_filename += ".zst" 75 log.info(f"Writing to {output_filename} (compressed)") 76 else: 77 log.info(f"Writing to {output_filename}") 78 79 df = pl.DataFrame( 80 { 81 "#CHROM": self.__snpobj.variants_chrom, 82 "POS": self.__snpobj.variants_pos, 83 "ID": self.__snpobj.variants_id, 84 "REF": self.__snpobj.variants_ref, 85 "ALT": self.__snpobj.variants_alt, 86 "FILTER": self.__snpobj.variants_filter_pass, 87 # TODO: add INFO column to SNPObject and write it to the .pvar file? (if not it's lost) 88 } 89 ) 90 # TODO: add header to the .pvar file, if not it's lost 91 92 # Write the DataFrame to a CSV string 93 csv_data = df.write_csv(None, separator="\t") 94 95 if vzs: 96 # Compress the CSV data using zstd 97 cctx = zstd.ZstdCompressor() 98 compressed_data = cctx.compress(csv_data.encode('utf-8')) 99 with open(output_filename, 'wb') as f: 100 f.write(compressed_data) 101 else: 102 with open(output_filename, 'w') as f: 103 f.write(csv_data) 104 105 def write_psam(self): 106 """ 107 Writes sample metadata to the .psam file. 108 """ 109 log.info(f"Writing {self.__filename}.psam") 110 df = pl.DataFrame( 111 { 112 "IID": self.__snpobj.samples, 113 "SEX": "NA", # Add SEX as nan for now 114 # TODO: add SEX as Optional column to SNPObject and write it to the .psam file (if not it's lost) 115 } 116 ) 117 df.write_csv(f"{self.__filename}.psam", separator="\t") 118 119 def write_pgen(self): 120 """ 121 Writes the genotype data to a .pgen file. 122 """ 123 log.info(f"Writing to {self.__filename}.pgen") 124 summed_strands = False if self.__snpobj.calldata_gt.ndim == 3 else True 125 if not summed_strands: 126 num_variants, num_samples, num_alleles = self.__snpobj.calldata_gt.shape 127 # Flatten the genotype matrix for pgenlib 128 flat_genotypes = self.__snpobj.calldata_gt.reshape( 129 num_variants, num_samples * num_alleles 130 ) 131 with pg.PgenWriter( 132 filename=f"{self.__filename}.pgen".encode('utf-8'), 133 sample_ct=num_samples, 134 variant_ct=num_variants, 135 hardcall_phase_present=True, 136 ) as writer: 137 for variant_index in range(num_variants): 138 writer.append_alleles( 139 flat_genotypes[variant_index].astype(np.int32), all_phased=True 140 ) 141 else: 142 num_variants, num_samples = self.__snpobj.calldata_gt.shape 143 # Transpose to (samples, variants) 144 genotypes = self.__snpobj.calldata_gt.T # Shape is (samples, variants) 145 with pg.PgenWriter( 146 filename=f"{self.__filename}.pgen".encode('utf-8'), 147 sample_ct=num_samples, 148 variant_ct=num_variants, 149 hardcall_phase_present=False, 150 ) as writer: 151 for variant_index in range(num_variants): 152 variant_genotypes = genotypes[:, variant_index].astype(np.int8) 153 # Map missing genotypes to -9 if necessary 154 variant_genotypes[variant_genotypes == -1] = -9 155 writer.append_biallelic(np.ascontiguousarray(variant_genotypes))
Writes a genotype object in PGEN format (.pgen, .psam, and .pvar files) in the specified output path.
20 def __init__(self, snpobj: SNPObject, filename: str): 21 """ 22 Initializes the PGENWriter instance. 23 24 Args: 25 snpobj (SNPObject): The SNPObject containing genotype data to be written. 26 filename (str): Base path for the output files (excluding extension). 27 """ 28 self.__snpobj = snpobj 29 self.__filename = Path(filename)
Initializes the PGENWriter instance.
Arguments:
- snpobj (SNPObject): The SNPObject containing genotype data to be written.
- filename (str): Base path for the output files (excluding extension).
31 def write( 32 self, 33 vzs: bool = False, 34 rename_missing_values: bool = True, 35 before: Union[int, float, str] = -1, 36 after: Union[int, float, str] = '.' 37 ): 38 """ 39 Writes the SNPObject data to .pgen, .psam, and .pvar files. 40 41 Args: 42 vzs (bool, optional): 43 If True, compresses the .pvar file using zstd and saves it as .pvar.zst. Defaults to False. 44 rename_missing_values (bool, optional): 45 If True, renames potential missing values in `snpobj.calldata_gt` before writing. 46 Defaults to True. 47 before (int, float, or str, default=-1): 48 The current representation of missing values in `calldata_gt`. Common values might be -1, '.', or NaN. 49 Default is -1. 50 after (int, float, or str, default='.'): 51 The value that will replace `before`. Default is '.'. 52 """ 53 file_extensions = (".pgen", ".psam", ".pvar", ".pvar.zst") 54 if self.__filename.suffix in file_extensions: 55 self.__filename = self.__filename.with_suffix('') 56 57 # Optionally rename potential missing values in `snpobj.calldata_gt` before writing 58 if rename_missing_values: 59 self.__snpobj.rename_missings(before=before, after=after, inplace=True) 60 61 self.write_pvar(vzs=vzs) 62 self.write_psam() 63 self.write_pgen()
Writes the SNPObject data to .pgen, .psam, and .pvar files.
Arguments:
- vzs (bool, optional): If True, compresses the .pvar file using zstd and saves it as .pvar.zst. Defaults to False.
- rename_missing_values (bool, optional): If True, renames potential missing values in
snpobj.calldata_gt
before writing. Defaults to True. - before (int, float, or str, default=-1): The current representation of missing values in
calldata_gt
. Common values might be -1, '.', or NaN. Default is -1. - after (int, float, or str, default='.'): The value that will replace
before
. Default is '.'.
65 def write_pvar(self, vzs: bool = False): 66 """ 67 Writes variant data to the .pvar file. 68 69 Args: 70 vzs (bool, optional): If True, compresses the .pvar file using zstd and saves it as .pvar.zst. Defaults to False. 71 """ 72 output_filename = f"{self.__filename}.pvar" 73 if vzs: 74 output_filename += ".zst" 75 log.info(f"Writing to {output_filename} (compressed)") 76 else: 77 log.info(f"Writing to {output_filename}") 78 79 df = pl.DataFrame( 80 { 81 "#CHROM": self.__snpobj.variants_chrom, 82 "POS": self.__snpobj.variants_pos, 83 "ID": self.__snpobj.variants_id, 84 "REF": self.__snpobj.variants_ref, 85 "ALT": self.__snpobj.variants_alt, 86 "FILTER": self.__snpobj.variants_filter_pass, 87 # TODO: add INFO column to SNPObject and write it to the .pvar file? (if not it's lost) 88 } 89 ) 90 # TODO: add header to the .pvar file, if not it's lost 91 92 # Write the DataFrame to a CSV string 93 csv_data = df.write_csv(None, separator="\t") 94 95 if vzs: 96 # Compress the CSV data using zstd 97 cctx = zstd.ZstdCompressor() 98 compressed_data = cctx.compress(csv_data.encode('utf-8')) 99 with open(output_filename, 'wb') as f: 100 f.write(compressed_data) 101 else: 102 with open(output_filename, 'w') as f: 103 f.write(csv_data)
Writes variant data to the .pvar file.
Arguments:
- vzs (bool, optional): If True, compresses the .pvar file using zstd and saves it as .pvar.zst. Defaults to False.
105 def write_psam(self): 106 """ 107 Writes sample metadata to the .psam file. 108 """ 109 log.info(f"Writing {self.__filename}.psam") 110 df = pl.DataFrame( 111 { 112 "IID": self.__snpobj.samples, 113 "SEX": "NA", # Add SEX as nan for now 114 # TODO: add SEX as Optional column to SNPObject and write it to the .psam file (if not it's lost) 115 } 116 ) 117 df.write_csv(f"{self.__filename}.psam", separator="\t")
Writes sample metadata to the .psam file.
119 def write_pgen(self): 120 """ 121 Writes the genotype data to a .pgen file. 122 """ 123 log.info(f"Writing to {self.__filename}.pgen") 124 summed_strands = False if self.__snpobj.calldata_gt.ndim == 3 else True 125 if not summed_strands: 126 num_variants, num_samples, num_alleles = self.__snpobj.calldata_gt.shape 127 # Flatten the genotype matrix for pgenlib 128 flat_genotypes = self.__snpobj.calldata_gt.reshape( 129 num_variants, num_samples * num_alleles 130 ) 131 with pg.PgenWriter( 132 filename=f"{self.__filename}.pgen".encode('utf-8'), 133 sample_ct=num_samples, 134 variant_ct=num_variants, 135 hardcall_phase_present=True, 136 ) as writer: 137 for variant_index in range(num_variants): 138 writer.append_alleles( 139 flat_genotypes[variant_index].astype(np.int32), all_phased=True 140 ) 141 else: 142 num_variants, num_samples = self.__snpobj.calldata_gt.shape 143 # Transpose to (samples, variants) 144 genotypes = self.__snpobj.calldata_gt.T # Shape is (samples, variants) 145 with pg.PgenWriter( 146 filename=f"{self.__filename}.pgen".encode('utf-8'), 147 sample_ct=num_samples, 148 variant_ct=num_variants, 149 hardcall_phase_present=False, 150 ) as writer: 151 for variant_index in range(num_variants): 152 variant_genotypes = genotypes[:, variant_index].astype(np.int8) 153 # Map missing genotypes to -9 if necessary 154 variant_genotypes[variant_genotypes == -1] = -9 155 writer.append_biallelic(np.ascontiguousarray(variant_genotypes))
Writes the genotype data to a .pgen file.
14class VCFWriter: 15 """Writes a phenotype object in VCF format in the specified output path.""" 16 17 def __init__(self, snpobj: SNPObject, filename: str, n_jobs: int = -1, phased: bool = False): 18 """ 19 Parameters 20 ---------- 21 snpobj : ``SNPObject`` instance 22 A SNPObject instance. 23 file : str 24 Path to write VCF file. 25 n_jobs : int, default=-1 26 Number of jobs to run in parallel. None means 1 unless in a 27 joblib.parallel_backend context. -1 means using all processors. 28 phased : bool, default=True 29 If True, the genotype data is written in the "maternal|paternal" 30 format. If False, the genotype data is written in the 31 "maternal/paternal" format. 32 """ 33 self.__snpobj = snpobj 34 self.__filename = Path(filename) 35 self.__n_jobs = n_jobs 36 self.__phased = phased 37 38 def write( 39 self, 40 chrom_partition: bool = False, 41 rename_missing_values: bool = True, 42 before: Union[int, float, str] = -1, 43 after: Union[int, float, str] = '.' 44 ): 45 """ 46 Writes the SNP data to VCF file(s). 47 48 Args: 49 chrom_partition (bool, optional): 50 If True, individual VCF files are generated for each chromosome. 51 If False, a single VCF file containing data for all chromosomes is created. Defaults to False. 52 rename_missing_values (bool, optional): 53 If True, renames potential missing values in `snpobj.calldata_gt` before writing. 54 Defaults to True. 55 before (int, float, or str, default=-1): 56 The current representation of missing values in `calldata_gt`. Common values might be -1, '.', or NaN. 57 Default is -1. 58 after (int, float, or str, default='.'): 59 The value that will replace `before`. Default is '.'. 60 """ 61 self.__chrom_partition = chrom_partition 62 63 file_extensions = (".vcf", ".bcf") 64 if self.__filename.suffix in file_extensions: 65 self.__file_extension = self.__filename.suffix 66 self.__filename = self.__filename.with_suffix('') 67 else: 68 self.__file_extension = ".vcf" 69 70 # Optionally rename potential missing values in `snpobj.calldata_gt` before writing 71 if rename_missing_values: 72 self.__snpobj.rename_missings(before=before, after=after, inplace=True) 73 74 data = self.__snpobj 75 76 if self.__chrom_partition: 77 chroms = data.unique_chrom 78 79 for chrom in chroms: 80 # Filter to include the data for the chromosome in particular 81 data_chrom = data.filter_variants(chrom=chrom, inplace=False) 82 83 log.debug(f'Storing chromosome {chrom}') 84 self.write_chromosome_data(chrom, data_chrom) 85 else: 86 self.write_chromosome_data("All", data) 87 88 def write_chromosome_data(self, chrom, data_chrom): 89 """ 90 Writes the SNP data for a specific chromosome to a VCF file. 91 92 Args: 93 chrom: The chromosome name. 94 data_chrom: The SNPObject instance containing the data for the chromosome. 95 """ 96 # Obtain npy matrix with SNPs 97 npy = data_chrom.calldata_gt 98 length, n_samples, num_strands = npy.shape 99 npy = npy.reshape(length, n_samples*2).T 100 101 # Keep sample names if appropriate 102 data_samples = data_chrom.samples if len(data_chrom.samples) == n_samples else [get_name() for _ in range(n_samples)] 103 104 # Metadata 105 df = pd.DataFrame({ 106 "CHROM": data_chrom.variants_chrom, 107 "POS": data_chrom.variants_pos, 108 "ID": data_chrom.variants_id, 109 "REF": data_chrom.variants_ref, 110 "ALT": data_chrom.variants_alt, 111 "QUAL": data_chrom.variants_qual, 112 "FILTER": ["PASS"] * length, 113 "INFO": ["."] * length, 114 "FORMAT": ["GT"] * length 115 }) 116 117 # Genotype data for each sample "maternal|paternal" 118 column_data = joblib.Parallel(n_jobs=self.__n_jobs)( 119 joblib.delayed(process_genotype)(npy, i, length, self.__phased) for i in range(n_samples) 120 ) 121 122 # Create the DataFrame once with all the columns 123 column_data = pd.DataFrame(np.array(column_data).T, columns=data_samples) 124 df = pd.concat([df, column_data], axis=1) 125 126 # Format output file 127 if chrom == "All": 128 file = self.__filename.with_suffix(self.__file_extension) 129 else: 130 file = self.__filename.parent / f"{self.__filename.stem}_{chrom}{self.__file_extension}" 131 132 # Write header 133 with open(file, "w") as f: 134 f.write("".join([ 135 "##fileformat=VCFv4.1\n", 136 '##FORMAT=<ID=GT,Number=1,Type=String,Description="Phased Genotype">\n', 137 *[f"##contig=<ID={chrom}>\n" for chrom in df["CHROM"].unique()], 138 "#" + "\t".join(map(str, df.columns)) + "\n" 139 ])) 140 141 # Write genotype data 142 df.to_csv(file, sep="\t", index=False, mode="a", header=False)
Writes a phenotype object in VCF format in the specified output path.
17 def __init__(self, snpobj: SNPObject, filename: str, n_jobs: int = -1, phased: bool = False): 18 """ 19 Parameters 20 ---------- 21 snpobj : ``SNPObject`` instance 22 A SNPObject instance. 23 file : str 24 Path to write VCF file. 25 n_jobs : int, default=-1 26 Number of jobs to run in parallel. None means 1 unless in a 27 joblib.parallel_backend context. -1 means using all processors. 28 phased : bool, default=True 29 If True, the genotype data is written in the "maternal|paternal" 30 format. If False, the genotype data is written in the 31 "maternal/paternal" format. 32 """ 33 self.__snpobj = snpobj 34 self.__filename = Path(filename) 35 self.__n_jobs = n_jobs 36 self.__phased = phased
Parameters
snpobj : SNPObject
instance
A SNPObject instance.
file : str
Path to write VCF file.
n_jobs : int, default=-1
Number of jobs to run in parallel. None means 1 unless in a
joblib.parallel_backend context. -1 means using all processors.
phased : bool, default=True
If True, the genotype data is written in the "maternal|paternal"
format. If False, the genotype data is written in the
"maternal/paternal" format.
38 def write( 39 self, 40 chrom_partition: bool = False, 41 rename_missing_values: bool = True, 42 before: Union[int, float, str] = -1, 43 after: Union[int, float, str] = '.' 44 ): 45 """ 46 Writes the SNP data to VCF file(s). 47 48 Args: 49 chrom_partition (bool, optional): 50 If True, individual VCF files are generated for each chromosome. 51 If False, a single VCF file containing data for all chromosomes is created. Defaults to False. 52 rename_missing_values (bool, optional): 53 If True, renames potential missing values in `snpobj.calldata_gt` before writing. 54 Defaults to True. 55 before (int, float, or str, default=-1): 56 The current representation of missing values in `calldata_gt`. Common values might be -1, '.', or NaN. 57 Default is -1. 58 after (int, float, or str, default='.'): 59 The value that will replace `before`. Default is '.'. 60 """ 61 self.__chrom_partition = chrom_partition 62 63 file_extensions = (".vcf", ".bcf") 64 if self.__filename.suffix in file_extensions: 65 self.__file_extension = self.__filename.suffix 66 self.__filename = self.__filename.with_suffix('') 67 else: 68 self.__file_extension = ".vcf" 69 70 # Optionally rename potential missing values in `snpobj.calldata_gt` before writing 71 if rename_missing_values: 72 self.__snpobj.rename_missings(before=before, after=after, inplace=True) 73 74 data = self.__snpobj 75 76 if self.__chrom_partition: 77 chroms = data.unique_chrom 78 79 for chrom in chroms: 80 # Filter to include the data for the chromosome in particular 81 data_chrom = data.filter_variants(chrom=chrom, inplace=False) 82 83 log.debug(f'Storing chromosome {chrom}') 84 self.write_chromosome_data(chrom, data_chrom) 85 else: 86 self.write_chromosome_data("All", data)
Writes the SNP data to VCF file(s).
Arguments:
- chrom_partition (bool, optional): If True, individual VCF files are generated for each chromosome. If False, a single VCF file containing data for all chromosomes is created. Defaults to False.
- rename_missing_values (bool, optional): If True, renames potential missing values in
snpobj.calldata_gt
before writing. Defaults to True. - before (int, float, or str, default=-1): The current representation of missing values in
calldata_gt
. Common values might be -1, '.', or NaN. Default is -1. - after (int, float, or str, default='.'): The value that will replace
before
. Default is '.'.
88 def write_chromosome_data(self, chrom, data_chrom): 89 """ 90 Writes the SNP data for a specific chromosome to a VCF file. 91 92 Args: 93 chrom: The chromosome name. 94 data_chrom: The SNPObject instance containing the data for the chromosome. 95 """ 96 # Obtain npy matrix with SNPs 97 npy = data_chrom.calldata_gt 98 length, n_samples, num_strands = npy.shape 99 npy = npy.reshape(length, n_samples*2).T 100 101 # Keep sample names if appropriate 102 data_samples = data_chrom.samples if len(data_chrom.samples) == n_samples else [get_name() for _ in range(n_samples)] 103 104 # Metadata 105 df = pd.DataFrame({ 106 "CHROM": data_chrom.variants_chrom, 107 "POS": data_chrom.variants_pos, 108 "ID": data_chrom.variants_id, 109 "REF": data_chrom.variants_ref, 110 "ALT": data_chrom.variants_alt, 111 "QUAL": data_chrom.variants_qual, 112 "FILTER": ["PASS"] * length, 113 "INFO": ["."] * length, 114 "FORMAT": ["GT"] * length 115 }) 116 117 # Genotype data for each sample "maternal|paternal" 118 column_data = joblib.Parallel(n_jobs=self.__n_jobs)( 119 joblib.delayed(process_genotype)(npy, i, length, self.__phased) for i in range(n_samples) 120 ) 121 122 # Create the DataFrame once with all the columns 123 column_data = pd.DataFrame(np.array(column_data).T, columns=data_samples) 124 df = pd.concat([df, column_data], axis=1) 125 126 # Format output file 127 if chrom == "All": 128 file = self.__filename.with_suffix(self.__file_extension) 129 else: 130 file = self.__filename.parent / f"{self.__filename.stem}_{chrom}{self.__file_extension}" 131 132 # Write header 133 with open(file, "w") as f: 134 f.write("".join([ 135 "##fileformat=VCFv4.1\n", 136 '##FORMAT=<ID=GT,Number=1,Type=String,Description="Phased Genotype">\n', 137 *[f"##contig=<ID={chrom}>\n" for chrom in df["CHROM"].unique()], 138 "#" + "\t".join(map(str, df.columns)) + "\n" 139 ])) 140 141 # Write genotype data 142 df.to_csv(file, sep="\t", index=False, mode="a", header=False)
Writes the SNP data for a specific chromosome to a VCF file.
Arguments:
- chrom: The chromosome name.
- data_chrom: The SNPObject instance containing the data for the chromosome.