snputils.visualization.admixture_viz

  1import subprocess
  2from typing import List, Optional, Dict, Any
  3import os
  4import sys
  5import argparse
  6import shutil
  7
  8
  9class PongVizError(Exception):
 10    """Custom exception for Pong visualization errors"""
 11    pass
 12
 13
 14class FileMapError(Exception):
 15    """Custom exception for filemap creation errors"""
 16    pass
 17
 18
 19def pong_viz(folder_runs: str, output_dir: str, k: Optional[int] = None, min_k: Optional[int] = None, max_k: Optional[int] = None,
 20             runs: List[int] = None, run_prefix: str = 'train', ind2pop_path: Optional[str] = None,
 21             pop_names_path: Optional[str] = None, color_list_path: Optional[str] = None, verbose: bool = False) -> None:
 22    """
 23    Executes Pong visualization with the specified parameters.
 24    """
 25
 26    # Check if pong is installed
 27    if not shutil.which('pong'):  # TODO: replace pong dependency with a custom implementation
 28        raise PongVizError("Pong is not installed. Please install it using 'pip install pong'")
 29
 30    # Validate input parameters
 31    if not os.path.isdir(folder_runs):
 32        raise PongVizError(f"Input folder does not exist: {folder_runs}")
 33    if not output_dir:
 34        raise PongVizError("Output directory must be specified")
 35    os.makedirs(output_dir, exist_ok=True)
 36
 37    # Optional files validation
 38    optional_files = {'ind2pop_path': ind2pop_path, 'pop_names_path': pop_names_path, 'color_list_path': color_list_path}
 39    for name, path in optional_files.items():
 40        if path and not os.path.isfile(path):
 41            raise PongVizError(f"Specified {name} file does not exist: {path}")
 42
 43    try:
 44        filemap_path = create_filemap(folder_runs, k, min_k, max_k, runs, run_prefix)
 45
 46        if verbose:
 47            print(f"Created filemap at: {filemap_path}")
 48
 49        # Build command
 50        cmd = ["pong", "-m", filemap_path, "-o", output_dir]
 51        if ind2pop_path:
 52            cmd.extend(["-i", ind2pop_path])
 53        if pop_names_path:
 54            cmd.extend(["-n", pop_names_path])
 55        if color_list_path:
 56            cmd.extend(["-l", color_list_path])
 57
 58        if verbose:
 59            print("Executing command:", " ".join(cmd))
 60
 61        # Execute pong
 62        subprocess.run(cmd)
 63
 64        if verbose:
 65            print("Pong execution successful")
 66            print("Results saved in:", output_dir)
 67
 68    except FileMapError as e:
 69        raise PongVizError(f"Error creating filemap: {str(e)}")
 70    except subprocess.CalledProcessError as e:
 71        raise PongVizError(f"Error executing Pong: {str(e)}\nOutput: {e.output}")
 72    except Exception as e:
 73        raise PongVizError(f"Unexpected error: {str(e)}")
 74
 75
 76def create_filemap(folder: str, k: Optional[int] = None, min_k: Optional[int] = None, max_k: Optional[int] = None,
 77                   runs: List[int] = None, run_prefix: str = 'train_demo',) -> str:
 78    """
 79    Creates a filemap for training files organized by k values and runs and saves it to a file.
 80
 81    Args:
 82        folder (str): Base folder path
 83        k (Optional[int]): Single k value to process. If specified, min_k and max_k are ignored
 84        min_k (Optional[int]): Minimum k value for range processing
 85        max_k (Optional[int]): Maximum k value for range processing
 86        runs (List[int]): List of run numbers
 87        run_prefix (str): Prefix for the run files (default: 'train_demo')
 88
 89    Returns:
 90        str: Path to saved file
 91
 92    Raises:
 93        FileMapError: If invalid parameters are provided or if configuration is incorrect
 94    """
 95    # Input validation
 96    if not folder:
 97        raise FileMapError("Folder path must be specified")
 98    if not runs:
 99        raise FileMapError("Runs list must be provided and non-empty")
100    if not all(isinstance(run, int) and run > 0 for run in runs):
101        raise FileMapError("All runs must be positive integers")
102    if k is not None:
103        if k < 1:
104            raise FileMapError("k must be a positive integer")
105        k_values = [k]
106    else:
107        if min_k is None or max_k is None:
108            raise FileMapError("Either k or both min_k and max_k must be specified")
109        if min_k > max_k:
110            raise FileMapError("min_k cannot be greater than max_k")
111        if min_k < 1:
112            raise FileMapError("min_k must be a positive integer")
113        k_values = range(min_k, max_k + 1)
114
115    # Write file:
116    filemap = {}
117    try:
118        for i, k_val in enumerate(k_values):
119            for run_num in range(runs[i]):
120                key = f'k{k_val}r{run_num+1}'
121                if folder != '.':
122                    path = folder + '/' + f'run{run_num+1}' + '/' + f'{run_prefix}.{k_val}.Q'
123                else:
124                    path = f'run{run_num+1}' + '/' + f'{run_prefix}.{k_val}.Q'
125
126                filemap[key] = (k_val, path)
127
128        filemap_path = os.path.join('pong_filemap')
129
130        with open(filemap_path, 'w') as f:
131            for i, (key, (k_val, path)) in enumerate(filemap.items()):
132                f.write(f"{key}\t{k_val}\t{path}\n")
133
134                if k_val in k_values:
135                    index = k_values.index(k_val)
136                    if (i + 1) % runs[index] == 0:
137                        f.write("\n")
138
139        return filemap_path
140
141    except Exception as e:
142        raise FileMapError(f"Error creating filemap: {str(e)}")
143
144
145def parse_pong_args() -> Dict[str, Any]:
146    """
147    Parse command line arguments for pong visualization.
148
149    Returns:
150        Dict[str, Any]: Dictionary containing all parsed arguments ready for pong_viz
151
152    Raises:
153        SystemExit: If argument parsing fails or --help is used
154    """
155    parser = argparse.ArgumentParser(
156        description='Execute Pong visualization with specified parameters',
157        formatter_class=argparse.ArgumentDefaultsHelpFormatter
158    )
159
160    parser.add_argument('folder_runs', help='Base folder containing the run files')
161    parser.add_argument('output_dir', help='Directory where results will be saved')
162
163    k_group = parser.add_mutually_exclusive_group(required=True)
164    k_group.add_argument('-k', '--k', type=int, help='Single k value to process')
165    k_group.add_argument('--k-range', nargs=2, type=int, metavar=('MIN_K', 'MAX_K'), help='Range of k values (min_k max_k)')
166
167    parser.add_argument('--runs', type=int, nargs='+', help='Number of runs for each K')
168    parser.add_argument('--prefix', default='train_demo', help='Prefix for run files')
169    parser.add_argument('-i', '--ind2pop', help='Path to individual-to-population mapping file')
170    parser.add_argument('-n', '--pop-names', help='Path to population names file')
171    parser.add_argument('-c', '--color-list', help='Path to color list file')
172    parser.add_argument('-v', '--verbose', action='store_true', help='Print detailed information during execution')
173    args = parser.parse_args()
174
175    params = {'folder_runs': args.folder_runs,
176              'output_dir': args.output_dir,
177              'runs': args.runs,
178              'run_prefix': args.prefix,
179              'verbose': args.verbose}
180
181    if args.k is not None:
182        params['k'] = args.k
183    else:
184        params['min_k'] = args.k_range[0]
185        params['max_k'] = args.k_range[1]
186
187    if args.ind2pop:
188        params['ind2pop_path'] = args.ind2pop
189    if args.pop_names:
190        params['pop_names_path'] = args.pop_names
191    if args.color_list:
192        params['color_list_path'] = args.color_list
193
194    return params
195
196
197if __name__ == "__main__":
198    try:
199        params = parse_pong_args()
200        pong_viz(**params)
201
202    except (FileMapError, PongVizError) as e:
203        print(f"Error: {e}", file=sys.stderr)
204        sys.exit(1)
205    except Exception as e:
206        print(f"Unexpected error: {e}", file=sys.stderr)
207        sys.exit(1)
class PongVizError(builtins.Exception):
10class PongVizError(Exception):
11    """Custom exception for Pong visualization errors"""
12    pass

Custom exception for Pong visualization errors

class FileMapError(builtins.Exception):
15class FileMapError(Exception):
16    """Custom exception for filemap creation errors"""
17    pass

Custom exception for filemap creation errors

def pong_viz( folder_runs: str, output_dir: str, k: Optional[int] = None, min_k: Optional[int] = None, max_k: Optional[int] = None, runs: List[int] = None, run_prefix: str = 'train', ind2pop_path: Optional[str] = None, pop_names_path: Optional[str] = None, color_list_path: Optional[str] = None, verbose: bool = False) -> None:
20def pong_viz(folder_runs: str, output_dir: str, k: Optional[int] = None, min_k: Optional[int] = None, max_k: Optional[int] = None,
21             runs: List[int] = None, run_prefix: str = 'train', ind2pop_path: Optional[str] = None,
22             pop_names_path: Optional[str] = None, color_list_path: Optional[str] = None, verbose: bool = False) -> None:
23    """
24    Executes Pong visualization with the specified parameters.
25    """
26
27    # Check if pong is installed
28    if not shutil.which('pong'):  # TODO: replace pong dependency with a custom implementation
29        raise PongVizError("Pong is not installed. Please install it using 'pip install pong'")
30
31    # Validate input parameters
32    if not os.path.isdir(folder_runs):
33        raise PongVizError(f"Input folder does not exist: {folder_runs}")
34    if not output_dir:
35        raise PongVizError("Output directory must be specified")
36    os.makedirs(output_dir, exist_ok=True)
37
38    # Optional files validation
39    optional_files = {'ind2pop_path': ind2pop_path, 'pop_names_path': pop_names_path, 'color_list_path': color_list_path}
40    for name, path in optional_files.items():
41        if path and not os.path.isfile(path):
42            raise PongVizError(f"Specified {name} file does not exist: {path}")
43
44    try:
45        filemap_path = create_filemap(folder_runs, k, min_k, max_k, runs, run_prefix)
46
47        if verbose:
48            print(f"Created filemap at: {filemap_path}")
49
50        # Build command
51        cmd = ["pong", "-m", filemap_path, "-o", output_dir]
52        if ind2pop_path:
53            cmd.extend(["-i", ind2pop_path])
54        if pop_names_path:
55            cmd.extend(["-n", pop_names_path])
56        if color_list_path:
57            cmd.extend(["-l", color_list_path])
58
59        if verbose:
60            print("Executing command:", " ".join(cmd))
61
62        # Execute pong
63        subprocess.run(cmd)
64
65        if verbose:
66            print("Pong execution successful")
67            print("Results saved in:", output_dir)
68
69    except FileMapError as e:
70        raise PongVizError(f"Error creating filemap: {str(e)}")
71    except subprocess.CalledProcessError as e:
72        raise PongVizError(f"Error executing Pong: {str(e)}\nOutput: {e.output}")
73    except Exception as e:
74        raise PongVizError(f"Unexpected error: {str(e)}")

Executes Pong visualization with the specified parameters.

def create_filemap( folder: str, k: Optional[int] = None, min_k: Optional[int] = None, max_k: Optional[int] = None, runs: List[int] = None, run_prefix: str = 'train_demo') -> str:
 77def create_filemap(folder: str, k: Optional[int] = None, min_k: Optional[int] = None, max_k: Optional[int] = None,
 78                   runs: List[int] = None, run_prefix: str = 'train_demo',) -> str:
 79    """
 80    Creates a filemap for training files organized by k values and runs and saves it to a file.
 81
 82    Args:
 83        folder (str): Base folder path
 84        k (Optional[int]): Single k value to process. If specified, min_k and max_k are ignored
 85        min_k (Optional[int]): Minimum k value for range processing
 86        max_k (Optional[int]): Maximum k value for range processing
 87        runs (List[int]): List of run numbers
 88        run_prefix (str): Prefix for the run files (default: 'train_demo')
 89
 90    Returns:
 91        str: Path to saved file
 92
 93    Raises:
 94        FileMapError: If invalid parameters are provided or if configuration is incorrect
 95    """
 96    # Input validation
 97    if not folder:
 98        raise FileMapError("Folder path must be specified")
 99    if not runs:
100        raise FileMapError("Runs list must be provided and non-empty")
101    if not all(isinstance(run, int) and run > 0 for run in runs):
102        raise FileMapError("All runs must be positive integers")
103    if k is not None:
104        if k < 1:
105            raise FileMapError("k must be a positive integer")
106        k_values = [k]
107    else:
108        if min_k is None or max_k is None:
109            raise FileMapError("Either k or both min_k and max_k must be specified")
110        if min_k > max_k:
111            raise FileMapError("min_k cannot be greater than max_k")
112        if min_k < 1:
113            raise FileMapError("min_k must be a positive integer")
114        k_values = range(min_k, max_k + 1)
115
116    # Write file:
117    filemap = {}
118    try:
119        for i, k_val in enumerate(k_values):
120            for run_num in range(runs[i]):
121                key = f'k{k_val}r{run_num+1}'
122                if folder != '.':
123                    path = folder + '/' + f'run{run_num+1}' + '/' + f'{run_prefix}.{k_val}.Q'
124                else:
125                    path = f'run{run_num+1}' + '/' + f'{run_prefix}.{k_val}.Q'
126
127                filemap[key] = (k_val, path)
128
129        filemap_path = os.path.join('pong_filemap')
130
131        with open(filemap_path, 'w') as f:
132            for i, (key, (k_val, path)) in enumerate(filemap.items()):
133                f.write(f"{key}\t{k_val}\t{path}\n")
134
135                if k_val in k_values:
136                    index = k_values.index(k_val)
137                    if (i + 1) % runs[index] == 0:
138                        f.write("\n")
139
140        return filemap_path
141
142    except Exception as e:
143        raise FileMapError(f"Error creating filemap: {str(e)}")

Creates a filemap for training files organized by k values and runs and saves it to a file.

Arguments:
  • folder (str): Base folder path
  • k (Optional[int]): Single k value to process. If specified, min_k and max_k are ignored
  • min_k (Optional[int]): Minimum k value for range processing
  • max_k (Optional[int]): Maximum k value for range processing
  • runs (List[int]): List of run numbers
  • run_prefix (str): Prefix for the run files (default: 'train_demo')
Returns:

str: Path to saved file

Raises:
  • FileMapError: If invalid parameters are provided or if configuration is incorrect
def parse_pong_args() -> Dict[str, Any]:
146def parse_pong_args() -> Dict[str, Any]:
147    """
148    Parse command line arguments for pong visualization.
149
150    Returns:
151        Dict[str, Any]: Dictionary containing all parsed arguments ready for pong_viz
152
153    Raises:
154        SystemExit: If argument parsing fails or --help is used
155    """
156    parser = argparse.ArgumentParser(
157        description='Execute Pong visualization with specified parameters',
158        formatter_class=argparse.ArgumentDefaultsHelpFormatter
159    )
160
161    parser.add_argument('folder_runs', help='Base folder containing the run files')
162    parser.add_argument('output_dir', help='Directory where results will be saved')
163
164    k_group = parser.add_mutually_exclusive_group(required=True)
165    k_group.add_argument('-k', '--k', type=int, help='Single k value to process')
166    k_group.add_argument('--k-range', nargs=2, type=int, metavar=('MIN_K', 'MAX_K'), help='Range of k values (min_k max_k)')
167
168    parser.add_argument('--runs', type=int, nargs='+', help='Number of runs for each K')
169    parser.add_argument('--prefix', default='train_demo', help='Prefix for run files')
170    parser.add_argument('-i', '--ind2pop', help='Path to individual-to-population mapping file')
171    parser.add_argument('-n', '--pop-names', help='Path to population names file')
172    parser.add_argument('-c', '--color-list', help='Path to color list file')
173    parser.add_argument('-v', '--verbose', action='store_true', help='Print detailed information during execution')
174    args = parser.parse_args()
175
176    params = {'folder_runs': args.folder_runs,
177              'output_dir': args.output_dir,
178              'runs': args.runs,
179              'run_prefix': args.prefix,
180              'verbose': args.verbose}
181
182    if args.k is not None:
183        params['k'] = args.k
184    else:
185        params['min_k'] = args.k_range[0]
186        params['max_k'] = args.k_range[1]
187
188    if args.ind2pop:
189        params['ind2pop_path'] = args.ind2pop
190    if args.pop_names:
191        params['pop_names_path'] = args.pop_names
192    if args.color_list:
193        params['color_list_path'] = args.color_list
194
195    return params

Parse command line arguments for pong visualization.

Returns:

Dict[str, Any]: Dictionary containing all parsed arguments ready for pong_viz

Raises:
  • SystemExit: If argument parsing fails or --help is used