snputils.datasets.load_dataset

  1import logging
  2from typing import Optional, Union, List
  3import tempfile
  4from pathlib import Path
  5
  6from snputils.snp.genobj.snpobj import SNPObject
  7from snputils.snp.io.read.pgen import PGENReader
  8from snputils._utils.data_home import get_data_home
  9from snputils._utils.plink import execute_plink_cmd
 10from snputils._utils.download import download_url
 11
 12log = logging.getLogger(__name__)
 13
 14base_urls = {
 15    "1kgp": "https://ftp.1000genomes.ebi.ac.uk/vol1/ftp/data_collections/1000_genomes_project/release/20181203_biallelic_SNV"
 16}
 17
 18chr_list = list(range(1, 23)) + ["X", "Y"]
 19chr_urls = {
 20    "1kgp": {
 21        str(i): f"ALL.chr{i}.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz" for i in chr_list
 22    }
 23}
 24
 25
 26def available_datasets_list() -> List[str]:
 27    """
 28    Get the list of available datasets.
 29    """
 30    return list(base_urls.keys())
 31
 32
 33def load_dataset(
 34        name: str,
 35        chromosomes: Union[List[str], List[int], str, int],
 36        variants_ids: Optional[List[str]] = None,
 37        sample_ids: Optional[List[str]] = None,
 38        verbose: bool = True,
 39        **read_kwargs
 40) -> SNPObject:
 41    """
 42    Load a genome dataset.
 43
 44    Args:
 45        name (str): Name of the dataset to load. Call `available_datasets_list()` to get the list of available datasets.
 46        chromosomes (List[str] | List[int] | str | int): Chromosomes to load.
 47        variants_ids (List[str]): List of variant IDs to load.
 48        sample_ids (List[str]): List of sample IDs to load.
 49        verbose (bool): Whether to show progress.
 50        **read_kwargs: Keyword arguments to pass to `PGENReader.read()`.
 51
 52    Returns:
 53        SNPObject: SNPObject containing the loaded dataset.
 54    """
 55    if isinstance(chromosomes, (str, int)):
 56        chromosomes = [chromosomes]
 57    chromosomes = [str(chr).lower().replace("chr", "") for chr in chromosomes]
 58
 59    if variants_ids is not None:
 60        variants_ids_txt = tempfile.NamedTemporaryFile(mode='w')
 61        variants_ids_txt.write("\n".join(variants_ids))
 62        variants_ids_txt.flush()
 63
 64    if sample_ids is not None:
 65        sample_ids_txt = tempfile.NamedTemporaryFile(mode='w')
 66        sample_ids_txt.write("\n".join(sample_ids))
 67        sample_ids_txt.flush()
 68
 69    merge_list_txt = tempfile.NamedTemporaryFile(mode='w')
 70
 71    data_home = get_data_home()
 72
 73    if name == "1kgp":
 74        data_path = data_home / name
 75        data_path.mkdir(parents=True, exist_ok=True)
 76        for chr in chromosomes:
 77            chr_path = data_path / chr_urls[name][chr]
 78            if not Path(chr_path).exists():
 79                log.info(f"Downloading chromosome {chr}...")
 80                download_url(f"{base_urls[name]}/{chr_urls[name][chr]}", chr_path, show_progress=verbose)
 81            else:
 82                log.info(f"Chromosome {chr} already exists. Skipping download.")
 83
 84            # Filter and convert to PGEN
 85            log.info(f"Processing chromosome {chr}...")
 86            out_file = chr_urls[name][chr].replace('.vcf.gz', '')
 87            execute_plink_cmd(
 88                ["--vcf", f"{chr_urls[name][chr]}"]
 89                + (["--keep", sample_ids_txt.name] if sample_ids is not None else [])
 90                + (["--extract", variants_ids_txt.name] if variants_ids is not None else [])
 91                + [
 92                    "--set-missing-var-ids", "@:#",
 93                    "--make-pgen",
 94                    "--out", out_file,
 95                ], cwd=data_path)
 96            merge_list_txt.write(f"{out_file}\n")
 97
 98        if len(chromosomes) > 1:
 99            # Merge the PGEN files into single PGEN fileset
100            log.info("Merging PGEN files...")
101            merge_list_txt.flush()
102            print(f"Merge list file contents: {open(merge_list_txt.name, 'r').read()}")
103            execute_plink_cmd(["--pmerge-list", merge_list_txt.name, "--make-pgen", "--out", "1kgp"],
104                              cwd=data_path)
105        else:
106            # Rename the single PGEN file
107            for ext in ["pgen", "psam", "pvar"]:
108                Path(data_path / f"{out_file}.{ext}").rename(data_path / f"1kgp.{ext}")
109
110        # Read PGEN fileset with PGENReader into SNPObject
111        log.info("Reading PGEN fileset...")
112        snpobj = PGENReader(data_path / "1kgp").read(**read_kwargs)
113    else:
114        raise NotImplementedError(f"Dataset {name} not implemented.")
115
116    if variants_ids is not None:
117        variants_ids_txt.close()
118    if sample_ids is not None:
119        sample_ids_txt.close()
120    merge_list_txt.close()
121
122    return snpobj
log = <Logger snputils.datasets.load_dataset (WARNING)>
base_urls = {'1kgp': 'https://ftp.1000genomes.ebi.ac.uk/vol1/ftp/data_collections/1000_genomes_project/release/20181203_biallelic_SNV'}
chr_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 'X', 'Y']
chr_urls = {'1kgp': {'1': 'ALL.chr1.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '2': 'ALL.chr2.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '3': 'ALL.chr3.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '4': 'ALL.chr4.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '5': 'ALL.chr5.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '6': 'ALL.chr6.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '7': 'ALL.chr7.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '8': 'ALL.chr8.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '9': 'ALL.chr9.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '10': 'ALL.chr10.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '11': 'ALL.chr11.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '12': 'ALL.chr12.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '13': 'ALL.chr13.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '14': 'ALL.chr14.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '15': 'ALL.chr15.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '16': 'ALL.chr16.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '17': 'ALL.chr17.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '18': 'ALL.chr18.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '19': 'ALL.chr19.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '20': 'ALL.chr20.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '21': 'ALL.chr21.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', '22': 'ALL.chr22.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', 'X': 'ALL.chrX.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz', 'Y': 'ALL.chrY.shapeit2_integrated_v1a.GRCh38.20181129.phased.vcf.gz'}}
def available_datasets_list() -> List[str]:
27def available_datasets_list() -> List[str]:
28    """
29    Get the list of available datasets.
30    """
31    return list(base_urls.keys())

Get the list of available datasets.

def load_dataset( name: str, chromosomes: Union[List[str], List[int], str, int], variants_ids: Optional[List[str]] = None, sample_ids: Optional[List[str]] = None, verbose: bool = True, **read_kwargs) -> snputils.snp.genobj.SNPObject:
 34def load_dataset(
 35        name: str,
 36        chromosomes: Union[List[str], List[int], str, int],
 37        variants_ids: Optional[List[str]] = None,
 38        sample_ids: Optional[List[str]] = None,
 39        verbose: bool = True,
 40        **read_kwargs
 41) -> SNPObject:
 42    """
 43    Load a genome dataset.
 44
 45    Args:
 46        name (str): Name of the dataset to load. Call `available_datasets_list()` to get the list of available datasets.
 47        chromosomes (List[str] | List[int] | str | int): Chromosomes to load.
 48        variants_ids (List[str]): List of variant IDs to load.
 49        sample_ids (List[str]): List of sample IDs to load.
 50        verbose (bool): Whether to show progress.
 51        **read_kwargs: Keyword arguments to pass to `PGENReader.read()`.
 52
 53    Returns:
 54        SNPObject: SNPObject containing the loaded dataset.
 55    """
 56    if isinstance(chromosomes, (str, int)):
 57        chromosomes = [chromosomes]
 58    chromosomes = [str(chr).lower().replace("chr", "") for chr in chromosomes]
 59
 60    if variants_ids is not None:
 61        variants_ids_txt = tempfile.NamedTemporaryFile(mode='w')
 62        variants_ids_txt.write("\n".join(variants_ids))
 63        variants_ids_txt.flush()
 64
 65    if sample_ids is not None:
 66        sample_ids_txt = tempfile.NamedTemporaryFile(mode='w')
 67        sample_ids_txt.write("\n".join(sample_ids))
 68        sample_ids_txt.flush()
 69
 70    merge_list_txt = tempfile.NamedTemporaryFile(mode='w')
 71
 72    data_home = get_data_home()
 73
 74    if name == "1kgp":
 75        data_path = data_home / name
 76        data_path.mkdir(parents=True, exist_ok=True)
 77        for chr in chromosomes:
 78            chr_path = data_path / chr_urls[name][chr]
 79            if not Path(chr_path).exists():
 80                log.info(f"Downloading chromosome {chr}...")
 81                download_url(f"{base_urls[name]}/{chr_urls[name][chr]}", chr_path, show_progress=verbose)
 82            else:
 83                log.info(f"Chromosome {chr} already exists. Skipping download.")
 84
 85            # Filter and convert to PGEN
 86            log.info(f"Processing chromosome {chr}...")
 87            out_file = chr_urls[name][chr].replace('.vcf.gz', '')
 88            execute_plink_cmd(
 89                ["--vcf", f"{chr_urls[name][chr]}"]
 90                + (["--keep", sample_ids_txt.name] if sample_ids is not None else [])
 91                + (["--extract", variants_ids_txt.name] if variants_ids is not None else [])
 92                + [
 93                    "--set-missing-var-ids", "@:#",
 94                    "--make-pgen",
 95                    "--out", out_file,
 96                ], cwd=data_path)
 97            merge_list_txt.write(f"{out_file}\n")
 98
 99        if len(chromosomes) > 1:
100            # Merge the PGEN files into single PGEN fileset
101            log.info("Merging PGEN files...")
102            merge_list_txt.flush()
103            print(f"Merge list file contents: {open(merge_list_txt.name, 'r').read()}")
104            execute_plink_cmd(["--pmerge-list", merge_list_txt.name, "--make-pgen", "--out", "1kgp"],
105                              cwd=data_path)
106        else:
107            # Rename the single PGEN file
108            for ext in ["pgen", "psam", "pvar"]:
109                Path(data_path / f"{out_file}.{ext}").rename(data_path / f"1kgp.{ext}")
110
111        # Read PGEN fileset with PGENReader into SNPObject
112        log.info("Reading PGEN fileset...")
113        snpobj = PGENReader(data_path / "1kgp").read(**read_kwargs)
114    else:
115        raise NotImplementedError(f"Dataset {name} not implemented.")
116
117    if variants_ids is not None:
118        variants_ids_txt.close()
119    if sample_ids is not None:
120        sample_ids_txt.close()
121    merge_list_txt.close()
122
123    return snpobj

Load a genome dataset.

Arguments:
  • name (str): Name of the dataset to load. Call available_datasets_list() to get the list of available datasets.
  • chromosomes (List[str] | List[int] | str | int): Chromosomes to load.
  • variants_ids (List[str]): List of variant IDs to load.
  • sample_ids (List[str]): List of sample IDs to load.
  • verbose (bool): Whether to show progress.
  • **read_kwargs: Keyword arguments to pass to PGENReader.read().
Returns:

SNPObject: SNPObject containing the loaded dataset.