snputils.datasets

1from .load_dataset import load_dataset
2
3__all__ = [
4    "load_dataset"
5]
def load_dataset( name: str, chromosomes: List[str] | List[int] | str | int, variants_ids: List[str] | None = None, sample_ids: List[str] | None = None, verbose: bool = True, **read_kwargs) -> snputils.snp.genobj.SNPObject:
 34def load_dataset(
 35        name: str,
 36        chromosomes: Union[List[str], List[int], str, int],
 37        variants_ids: Optional[List[str]] = None,
 38        sample_ids: Optional[List[str]] = None,
 39        verbose: bool = True,
 40        **read_kwargs
 41) -> SNPObject:
 42    """
 43    Load a genome dataset.
 44
 45    Args:
 46        name (str): Name of the dataset to load. Call `available_datasets_list()` to get the list of available datasets.
 47        chromosomes (List[str] | List[int] | str | int): Chromosomes to load.
 48        variants_ids (List[str]): List of variant IDs to load.
 49        sample_ids (List[str]): List of sample IDs to load.
 50        verbose (bool): Whether to show progress.
 51        **read_kwargs: Keyword arguments to pass to `PGENReader.read()`.
 52
 53    Returns:
 54        SNPObject: SNPObject containing the loaded dataset.
 55    """
 56    if isinstance(chromosomes, (str, int)):
 57        chromosomes = [chromosomes]
 58    chromosomes = [str(chr).lower().replace("chr", "") for chr in chromosomes]
 59
 60    if variants_ids is not None:
 61        variants_ids_txt = tempfile.NamedTemporaryFile(mode='w')
 62        variants_ids_txt.write("\n".join(variants_ids))
 63        variants_ids_txt.flush()
 64
 65    if sample_ids is not None:
 66        sample_ids_txt = tempfile.NamedTemporaryFile(mode='w')
 67        sample_ids_txt.write("\n".join(sample_ids))
 68        sample_ids_txt.flush()
 69
 70    merge_list_txt = tempfile.NamedTemporaryFile(mode='w')
 71
 72    data_home = get_data_home()
 73
 74    if name == "1kgp":
 75        data_path = data_home / name
 76        data_path.mkdir(parents=True, exist_ok=True)
 77        for chr in chromosomes:
 78            chr_path = data_path / chr_urls[name][chr]
 79            if not Path(chr_path).exists():
 80                log.info(f"Downloading chromosome {chr}...")
 81                download_url(f"{base_urls[name]}/{chr_urls[name][chr]}", chr_path, show_progress=verbose)
 82            else:
 83                log.info(f"Chromosome {chr} already exists. Skipping download.")
 84
 85            # Filter and convert to PGEN
 86            log.info(f"Processing chromosome {chr}...")
 87            out_file = chr_urls[name][chr].replace('.vcf.gz', '')
 88            execute_plink_cmd(
 89                ["--vcf", f"{chr_urls[name][chr]}"]
 90                + (["--keep", sample_ids_txt.name] if sample_ids is not None else [])
 91                + (["--extract", variants_ids_txt.name] if variants_ids is not None else [])
 92                + [
 93                    "--set-missing-var-ids", "@:#",
 94                    "--make-pgen",
 95                    "--out", out_file,
 96                ], cwd=data_path)
 97            merge_list_txt.write(f"{out_file}\n")
 98
 99        if len(chromosomes) > 1:
100            # Merge the PGEN files into single PGEN fileset
101            log.info("Merging PGEN files...")
102            merge_list_txt.flush()
103            print(f"Merge list file contents: {open(merge_list_txt.name, 'r').read()}")
104            execute_plink_cmd(["--pmerge-list", merge_list_txt.name, "--make-pgen", "--out", "1kgp"],
105                              cwd=data_path)
106        else:
107            # Rename the single PGEN file
108            for ext in ["pgen", "psam", "pvar"]:
109                Path(data_path / f"{out_file}.{ext}").rename(data_path / f"1kgp.{ext}")
110
111        # Read PGEN fileset with PGENReader into SNPObject
112        log.info("Reading PGEN fileset...")
113        snpobj = PGENReader(data_path / "1kgp").read(**read_kwargs)
114    else:
115        raise NotImplementedError(f"Dataset {name} not implemented.")
116
117    if variants_ids is not None:
118        variants_ids_txt.close()
119    if sample_ids is not None:
120        sample_ids_txt.close()
121    merge_list_txt.close()
122
123    return snpobj

Load a genome dataset.

Arguments:
  • name (str): Name of the dataset to load. Call available_datasets_list() to get the list of available datasets.
  • chromosomes (List[str] | List[int] | str | int): Chromosomes to load.
  • variants_ids (List[str]): List of variant IDs to load.
  • sample_ids (List[str]): List of sample IDs to load.
  • verbose (bool): Whether to show progress.
  • **read_kwargs: Keyword arguments to pass to PGENReader.read().
Returns:

SNPObject: SNPObject containing the loaded dataset.