snputils.datasets
def
load_dataset( name: str, chromosomes: List[str] | List[int] | str | int, variants_ids: List[str] | None = None, sample_ids: List[str] | None = None, verbose: bool = True, **read_kwargs) -> snputils.snp.genobj.SNPObject:
34def load_dataset( 35 name: str, 36 chromosomes: Union[List[str], List[int], str, int], 37 variants_ids: Optional[List[str]] = None, 38 sample_ids: Optional[List[str]] = None, 39 verbose: bool = True, 40 **read_kwargs 41) -> SNPObject: 42 """ 43 Load a genome dataset. 44 45 Args: 46 name (str): Name of the dataset to load. Call `available_datasets_list()` to get the list of available datasets. 47 chromosomes (List[str] | List[int] | str | int): Chromosomes to load. 48 variants_ids (List[str]): List of variant IDs to load. 49 sample_ids (List[str]): List of sample IDs to load. 50 verbose (bool): Whether to show progress. 51 **read_kwargs: Keyword arguments to pass to `PGENReader.read()`. 52 53 Returns: 54 SNPObject: SNPObject containing the loaded dataset. 55 """ 56 if isinstance(chromosomes, (str, int)): 57 chromosomes = [chromosomes] 58 chromosomes = [str(chr).lower().replace("chr", "") for chr in chromosomes] 59 60 if variants_ids is not None: 61 variants_ids_txt = tempfile.NamedTemporaryFile(mode='w') 62 variants_ids_txt.write("\n".join(variants_ids)) 63 variants_ids_txt.flush() 64 65 if sample_ids is not None: 66 sample_ids_txt = tempfile.NamedTemporaryFile(mode='w') 67 sample_ids_txt.write("\n".join(sample_ids)) 68 sample_ids_txt.flush() 69 70 merge_list_txt = tempfile.NamedTemporaryFile(mode='w') 71 72 data_home = get_data_home() 73 74 if name == "1kgp": 75 data_path = data_home / name 76 data_path.mkdir(parents=True, exist_ok=True) 77 for chr in chromosomes: 78 chr_path = data_path / chr_urls[name][chr] 79 if not Path(chr_path).exists(): 80 log.info(f"Downloading chromosome {chr}...") 81 download_url(f"{base_urls[name]}/{chr_urls[name][chr]}", chr_path, show_progress=verbose) 82 else: 83 log.info(f"Chromosome {chr} already exists. Skipping download.") 84 85 # Filter and convert to PGEN 86 log.info(f"Processing chromosome {chr}...") 87 out_file = chr_urls[name][chr].replace('.vcf.gz', '') 88 execute_plink_cmd( 89 ["--vcf", f"{chr_urls[name][chr]}"] 90 + (["--keep", sample_ids_txt.name] if sample_ids is not None else []) 91 + (["--extract", variants_ids_txt.name] if variants_ids is not None else []) 92 + [ 93 "--set-missing-var-ids", "@:#", 94 "--make-pgen", 95 "--out", out_file, 96 ], cwd=data_path) 97 merge_list_txt.write(f"{out_file}\n") 98 99 if len(chromosomes) > 1: 100 # Merge the PGEN files into single PGEN fileset 101 log.info("Merging PGEN files...") 102 merge_list_txt.flush() 103 print(f"Merge list file contents: {open(merge_list_txt.name, 'r').read()}") 104 execute_plink_cmd(["--pmerge-list", merge_list_txt.name, "--make-pgen", "--out", "1kgp"], 105 cwd=data_path) 106 else: 107 # Rename the single PGEN file 108 for ext in ["pgen", "psam", "pvar"]: 109 Path(data_path / f"{out_file}.{ext}").rename(data_path / f"1kgp.{ext}") 110 111 # Read PGEN fileset with PGENReader into SNPObject 112 log.info("Reading PGEN fileset...") 113 snpobj = PGENReader(data_path / "1kgp").read(**read_kwargs) 114 else: 115 raise NotImplementedError(f"Dataset {name} not implemented.") 116 117 if variants_ids is not None: 118 variants_ids_txt.close() 119 if sample_ids is not None: 120 sample_ids_txt.close() 121 merge_list_txt.close() 122 123 return snpobj
Load a genome dataset.
Arguments:
- name (str): Name of the dataset to load. Call
available_datasets_list()to get the list of available datasets. - chromosomes (List[str] | List[int] | str | int): Chromosomes to load.
- variants_ids (List[str]): List of variant IDs to load.
- sample_ids (List[str]): List of sample IDs to load.
- verbose (bool): Whether to show progress.
- **read_kwargs: Keyword arguments to pass to
PGENReader.read().
Returns:
SNPObject: SNPObject containing the loaded dataset.