snputils.datasets.load_dataset
1import logging 2from typing import Optional, Union, List 3import tempfile 4from pathlib import Path 5 6from snputils.snp.genobj.snpobj import SNPObject 7from snputils.snp.io.read.pgen import PGENReader 8from snputils._utils.data_home import get_data_home 9from snputils._utils.plink import execute_plink_cmd 10from snputils._utils.download import download_url 11 12log = logging.getLogger(__name__) 13 14base_urls = { 15 "1kgp": "https://ftp.1000genomes.ebi.ac.uk/vol1/ftp/data_collections/1000_genomes_project/release/20181203_biallelic_SNV" 16} 17 18chr_list = list(range(1, 23)) + ["X", "Y"] 19chr_urls = { 20 "1kgp": { 21 str(i): f"ALL.chr{i}.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz" for i in chr_list 22 } 23} 24 25 26def available_datasets_list() -> List[str]: 27 """ 28 Get the list of available datasets. 29 """ 30 return list(base_urls.keys()) 31 32 33def load_dataset( 34 name: str, 35 chromosomes: Union[List[str], List[int], str, int], 36 variants_ids: Optional[List[str]] = None, 37 sample_ids: Optional[List[str]] = None, 38 verbose: bool = True, 39 **read_kwargs 40) -> SNPObject: 41 """ 42 Load a genome dataset. 43 44 Args: 45 name (str): Name of the dataset to load. Call `available_datasets_list()` to get the list of available datasets. 46 chromosomes (List[str] | List[int] | str | int): Chromosomes to load. 47 variants_ids (List[str]): List of variant IDs to load. 48 sample_ids (List[str]): List of sample IDs to load. 49 verbose (bool): Whether to show progress. 50 **read_kwargs: Keyword arguments to pass to `PGENReader.read()`. 51 52 Returns: 53 SNPObject: SNPObject containing the loaded dataset. 54 """ 55 if isinstance(chromosomes, (str, int)): 56 chromosomes = [chromosomes] 57 chromosomes = [str(chr).lower().replace("chr", "") for chr in chromosomes] 58 59 if variants_ids is not None: 60 variants_ids_txt = tempfile.NamedTemporaryFile(mode='w') 61 variants_ids_txt.write("\n".join(variants_ids)) 62 variants_ids_txt.flush() 63 64 if sample_ids is not None: 65 sample_ids_txt = tempfile.NamedTemporaryFile(mode='w') 66 sample_ids_txt.write("\n".join(sample_ids)) 67 sample_ids_txt.flush() 68 69 merge_list_txt = tempfile.NamedTemporaryFile(mode='w') 70 71 data_home = get_data_home() 72 73 if name == "1kgp": 74 data_path = data_home / name 75 data_path.mkdir(parents=True, exist_ok=True) 76 for chr in chromosomes: 77 chr_path = data_path / chr_urls[name][chr] 78 if not Path(chr_path).exists(): 79 log.info(f"Downloading chromosome {chr}...") 80 download_url(f"{base_urls[name]}/{chr_urls[name][chr]}", chr_path, show_progress=verbose) 81 else: 82 log.info(f"Chromosome {chr} already exists. Skipping download.") 83 84 # Filter and convert to PGEN 85 log.info(f"Processing chromosome {chr}...") 86 out_file = chr_urls[name][chr].replace('.vcf.gz', '') 87 execute_plink_cmd( 88 ["--vcf", f"{chr_urls[name][chr]}"] 89 + (["--keep", sample_ids_txt.name] if sample_ids is not None else []) 90 + (["--extract", variants_ids_txt.name] if variants_ids is not None else []) 91 + [ 92 "--set-missing-var-ids", "@:#", 93 "--make-pgen", 94 "--out", out_file, 95 ], cwd=data_path) 96 merge_list_txt.write(f"{out_file}\n") 97 98 if len(chromosomes) > 1: 99 # Merge the PGEN files into single PGEN fileset 100 log.info("Merging PGEN files...") 101 merge_list_txt.flush() 102 print(f"Merge list file contents: {open(merge_list_txt.name, 'r').read()}") 103 execute_plink_cmd(["--pmerge-list", merge_list_txt.name, "--make-pgen", "--out", "1kgp"], 104 cwd=data_path) 105 else: 106 # Rename the single PGEN file 107 for ext in ["pgen", "psam", "pvar"]: 108 Path(data_path / f"{out_file}.{ext}").rename(data_path / f"1kgp.{ext}") 109 110 # Read PGEN fileset with PGENReader into SNPObject 111 log.info("Reading PGEN fileset...") 112 snpobj = PGENReader(data_path / "1kgp").read(**read_kwargs) 113 else: 114 raise NotImplementedError(f"Dataset {name} not implemented.") 115 116 if variants_ids is not None: 117 variants_ids_txt.close() 118 if sample_ids is not None: 119 sample_ids_txt.close() 120 merge_list_txt.close() 121 122 return snpobj
log =
<Logger snputils.datasets.load_dataset (WARNING)>
base_urls =
{'1kgp': 'https://ftp.1000genomes.ebi.ac.uk/vol1/ftp/data_collections/1000_genomes_project/release/20181203_biallelic_SNV'}
chr_list =
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 'X', 'Y']
chr_urls =
{'1kgp': {'1': 'ALL.chr1.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '2': 'ALL.chr2.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '3': 'ALL.chr3.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '4': 'ALL.chr4.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '5': 'ALL.chr5.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '6': 'ALL.chr6.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '7': 'ALL.chr7.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '8': 'ALL.chr8.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '9': 'ALL.chr9.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '10': 'ALL.chr10.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '11': 'ALL.chr11.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '12': 'ALL.chr12.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '13': 'ALL.chr13.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '14': 'ALL.chr14.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '15': 'ALL.chr15.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '16': 'ALL.chr16.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '17': 'ALL.chr17.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '18': 'ALL.chr18.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '19': 'ALL.chr19.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '20': 'ALL.chr20.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '21': 'ALL.chr21.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '22': 'ALL.chr22.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', 'X': 'ALL.chrX.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', 'Y': 'ALL.chrY.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz'}}
def
available_datasets_list() -> List[str]:
27def available_datasets_list() -> List[str]: 28 """ 29 Get the list of available datasets. 30 """ 31 return list(base_urls.keys())
Get the list of available datasets.
def
load_dataset( name: str, chromosomes: Union[List[str], List[int], str, int], variants_ids: Optional[List[str]] = None, sample_ids: Optional[List[str]] = None, verbose: bool = True, **read_kwargs) -> snputils.snp.genobj.SNPObject:
34def load_dataset( 35 name: str, 36 chromosomes: Union[List[str], List[int], str, int], 37 variants_ids: Optional[List[str]] = None, 38 sample_ids: Optional[List[str]] = None, 39 verbose: bool = True, 40 **read_kwargs 41) -> SNPObject: 42 """ 43 Load a genome dataset. 44 45 Args: 46 name (str): Name of the dataset to load. Call `available_datasets_list()` to get the list of available datasets. 47 chromosomes (List[str] | List[int] | str | int): Chromosomes to load. 48 variants_ids (List[str]): List of variant IDs to load. 49 sample_ids (List[str]): List of sample IDs to load. 50 verbose (bool): Whether to show progress. 51 **read_kwargs: Keyword arguments to pass to `PGENReader.read()`. 52 53 Returns: 54 SNPObject: SNPObject containing the loaded dataset. 55 """ 56 if isinstance(chromosomes, (str, int)): 57 chromosomes = [chromosomes] 58 chromosomes = [str(chr).lower().replace("chr", "") for chr in chromosomes] 59 60 if variants_ids is not None: 61 variants_ids_txt = tempfile.NamedTemporaryFile(mode='w') 62 variants_ids_txt.write("\n".join(variants_ids)) 63 variants_ids_txt.flush() 64 65 if sample_ids is not None: 66 sample_ids_txt = tempfile.NamedTemporaryFile(mode='w') 67 sample_ids_txt.write("\n".join(sample_ids)) 68 sample_ids_txt.flush() 69 70 merge_list_txt = tempfile.NamedTemporaryFile(mode='w') 71 72 data_home = get_data_home() 73 74 if name == "1kgp": 75 data_path = data_home / name 76 data_path.mkdir(parents=True, exist_ok=True) 77 for chr in chromosomes: 78 chr_path = data_path / chr_urls[name][chr] 79 if not Path(chr_path).exists(): 80 log.info(f"Downloading chromosome {chr}...") 81 download_url(f"{base_urls[name]}/{chr_urls[name][chr]}", chr_path, show_progress=verbose) 82 else: 83 log.info(f"Chromosome {chr} already exists. Skipping download.") 84 85 # Filter and convert to PGEN 86 log.info(f"Processing chromosome {chr}...") 87 out_file = chr_urls[name][chr].replace('.vcf.gz', '') 88 execute_plink_cmd( 89 ["--vcf", f"{chr_urls[name][chr]}"] 90 + (["--keep", sample_ids_txt.name] if sample_ids is not None else []) 91 + (["--extract", variants_ids_txt.name] if variants_ids is not None else []) 92 + [ 93 "--set-missing-var-ids", "@:#", 94 "--make-pgen", 95 "--out", out_file, 96 ], cwd=data_path) 97 merge_list_txt.write(f"{out_file}\n") 98 99 if len(chromosomes) > 1: 100 # Merge the PGEN files into single PGEN fileset 101 log.info("Merging PGEN files...") 102 merge_list_txt.flush() 103 print(f"Merge list file contents: {open(merge_list_txt.name, 'r').read()}") 104 execute_plink_cmd(["--pmerge-list", merge_list_txt.name, "--make-pgen", "--out", "1kgp"], 105 cwd=data_path) 106 else: 107 # Rename the single PGEN file 108 for ext in ["pgen", "psam", "pvar"]: 109 Path(data_path / f"{out_file}.{ext}").rename(data_path / f"1kgp.{ext}") 110 111 # Read PGEN fileset with PGENReader into SNPObject 112 log.info("Reading PGEN fileset...") 113 snpobj = PGENReader(data_path / "1kgp").read(**read_kwargs) 114 else: 115 raise NotImplementedError(f"Dataset {name} not implemented.") 116 117 if variants_ids is not None: 118 variants_ids_txt.close() 119 if sample_ids is not None: 120 sample_ids_txt.close() 121 merge_list_txt.close() 122 123 return snpobj
Load a genome dataset.
Arguments:
- name (str): Name of the dataset to load. Call
available_datasets_list()
to get the list of available datasets. - chromosomes (List[str] | List[int] | str | int): Chromosomes to load.
- variants_ids (List[str]): List of variant IDs to load.
- sample_ids (List[str]): List of sample IDs to load.
- verbose (bool): Whether to show progress.
- **read_kwargs: Keyword arguments to pass to
PGENReader.read()
.
Returns:
SNPObject: SNPObject containing the loaded dataset.