snputils.phenotype.io
class
UKBPhenReader(snputils.phenotype.io.read.base.PhenotypeBaseReader):
13class UKBPhenReader(PhenotypeBaseReader): 14 """ 15 A class for reading data from a `.phe` file and constructing a `UKBPhenotypeObject`. 16 """ 17 def __init__(self, file: Union[str, Path]) -> None: 18 """ 19 Args: 20 file (str or pathlib.Path): 21 Path to the `.phe` file containing UKB phenotype data. 22 """ 23 self._file = file 24 25 @property 26 def file(self) -> Path: 27 """ 28 Retrieve `file`. 29 30 Returns: 31 pathlib.Path: 32 Path to the `.phe` file containing UKB phenotype data. 33 """ 34 return self.__file 35 36 def read(self) -> 'UKBPhenotypeObject': 37 """ 38 Read data from a `.phe ` file and construct a `UKBPhenotypeObject`. 39 40 Returns: 41 UKBPhenotypeObject: 42 A UKB phenotype object instance. 43 """ 44 log.info(f"Reading .phe file from '{self.file}'...") 45 46 # Load the `.phe` file with specified column names and data types 47 phen_df = pd.read_csv( 48 self.file, 49 header=None, 50 delim_whitespace=True, 51 names=["FID", "IID", "status"], 52 dtype={"FID": str, "IID": str, "status": int}, 53 ) 54 55 # Extract sample IDs for cases based on status 56 cases_IDs = list(phen_df[phen_df["status"] == 2]["FID"]) 57 n_cases = len(cases_IDs) 58 if n_cases == 0: 59 raise ValueError("No case data available.") 60 61 # Extract sample IDs for controls based on status 62 controls_IDs = list(phen_df[phen_df["status"] == 1]["FID"]) 63 n_controls = len(controls_IDs) 64 if n_controls == 0: 65 raise ValueError("No control data available.") 66 67 # Verify the sample count integrity 68 sample_IDs = phen_df["FID"].tolist() 69 n_samples = len(sample_IDs) 70 if n_samples != (n_cases + n_controls): 71 raise ValueError( 72 "Total sample count does not match the combined count of cases and controls. " 73 f"Expected {n_cases + n_controls}; found {n_samples}." 74 ) 75 76 # Generate haplotypes for cases, controls, and all samples 77 cases_haplotypes = [f"{case}.0" for case in cases_IDs] + [f"{case}.1" for case in cases_IDs] 78 controls_haplotypes = [f"{control}.0" for control in controls_IDs] + [f"{control}.1" for control in controls_IDs] 79 all_haplotypes = [f"{sample}.0" for sample in sample_IDs] + [f"{sample}.1" for sample in sample_IDs] 80 81 return UKBPhenotypeObject(samples = sample_IDs, 82 n_samples = n_samples, 83 cases = cases_IDs, 84 n_cases = n_cases, 85 controls = controls_IDs, 86 n_controls = n_controls, 87 all_haplotypes = all_haplotypes, 88 cases_haplotypes = cases_haplotypes, 89 controls_haplotypes = controls_haplotypes 90 )
A class for reading data from a .phe
file and constructing a UKBPhenotypeObject
.
UKBPhenReader(file: Union[str, pathlib._local.Path])
17 def __init__(self, file: Union[str, Path]) -> None: 18 """ 19 Args: 20 file (str or pathlib.Path): 21 Path to the `.phe` file containing UKB phenotype data. 22 """ 23 self._file = file
Arguments:
- file (str or pathlib.Path): Path to the
.phe
file containing UKB phenotype data.
file: pathlib._local.Path
36 def read(self) -> 'UKBPhenotypeObject': 37 """ 38 Read data from a `.phe ` file and construct a `UKBPhenotypeObject`. 39 40 Returns: 41 UKBPhenotypeObject: 42 A UKB phenotype object instance. 43 """ 44 log.info(f"Reading .phe file from '{self.file}'...") 45 46 # Load the `.phe` file with specified column names and data types 47 phen_df = pd.read_csv( 48 self.file, 49 header=None, 50 delim_whitespace=True, 51 names=["FID", "IID", "status"], 52 dtype={"FID": str, "IID": str, "status": int}, 53 ) 54 55 # Extract sample IDs for cases based on status 56 cases_IDs = list(phen_df[phen_df["status"] == 2]["FID"]) 57 n_cases = len(cases_IDs) 58 if n_cases == 0: 59 raise ValueError("No case data available.") 60 61 # Extract sample IDs for controls based on status 62 controls_IDs = list(phen_df[phen_df["status"] == 1]["FID"]) 63 n_controls = len(controls_IDs) 64 if n_controls == 0: 65 raise ValueError("No control data available.") 66 67 # Verify the sample count integrity 68 sample_IDs = phen_df["FID"].tolist() 69 n_samples = len(sample_IDs) 70 if n_samples != (n_cases + n_controls): 71 raise ValueError( 72 "Total sample count does not match the combined count of cases and controls. " 73 f"Expected {n_cases + n_controls}; found {n_samples}." 74 ) 75 76 # Generate haplotypes for cases, controls, and all samples 77 cases_haplotypes = [f"{case}.0" for case in cases_IDs] + [f"{case}.1" for case in cases_IDs] 78 controls_haplotypes = [f"{control}.0" for control in controls_IDs] + [f"{control}.1" for control in controls_IDs] 79 all_haplotypes = [f"{sample}.0" for sample in sample_IDs] + [f"{sample}.1" for sample in sample_IDs] 80 81 return UKBPhenotypeObject(samples = sample_IDs, 82 n_samples = n_samples, 83 cases = cases_IDs, 84 n_cases = n_cases, 85 controls = controls_IDs, 86 n_controls = n_controls, 87 all_haplotypes = all_haplotypes, 88 cases_haplotypes = cases_haplotypes, 89 controls_haplotypes = controls_haplotypes 90 )
Read data from a .phe
file and construct a UKBPhenotypeObject
.
Returns:
UKBPhenotypeObject: A UKB phenotype object instance.
class
MultiPhenTabularReader(snputils.phenotype.io.read.base.PhenotypeBaseReader):
14class MultiPhenTabularReader(PhenotypeBaseReader): 15 """ 16 A class for reading data from a tabular file (`.xlsx`, `.csv`, `.map`, `.smap`, `.phen`) and 17 constructing a `MultiPhenotypeObject`. 18 """ 19 def __init__(self, file: Union[str, Path]) -> None: 20 """ 21 Args: 22 file (str or pathlib.Path): 23 Path to the file containing phenotype data. Accepted formats = [`.xlsx`, `.csv`, `.map`, `.smap`, `.phen`]. 24 """ 25 self.__file = file 26 27 @property 28 def file(self) -> Path: 29 """ 30 Retrieve `file`. 31 32 Returns: 33 pathlib.Path: 34 Path to the file containing phenotype data. Accepted formats = [`.xlsx`, `.csv`, `.map`, `.smap`, `.phen`]. 35 """ 36 return self.__file 37 38 def read( 39 self, 40 samples_idx: int = 0, 41 phen_names: Optional[List] = None, 42 sep: str = ',', 43 header: int = 0, 44 drop: bool = False 45 ) -> 'MultiPhenotypeObject': 46 """ 47 Read data from `file` and construct a `MultiPhenotypeObject`. 48 49 Args: 50 samples_idx (int, default=0): Index of the column containing sample identifiers. 51 Default is 0, assuming the first column contains sample identifiers. 52 phen_names (list of str, optional): List of phenotype column names. If provided, 53 these columns will be renamed to the specified names. 54 sep (str, default=','): The delimiter for separating values in `.csv`, `.tsv`, 55 or `.map` files. Default is ','. 56 header (int, default=0): Row index to use as the column names. By default, 57 uses the first row (`header=0`). Set to `None` if column names are provided 58 explicitly. 59 drop (bool, default=False): If True, removes columns not listed in `phen_names` 60 (except the samples column). 61 62 Returns: 63 MultiPhenotypeObject: 64 A multi-phenotype object instance. 65 """ 66 # Determine the file extension 67 file_extension = os.path.splitext(self.file)[1] 68 69 log.info(f"Reading '{file_extension}' file from '{self.file}'...") 70 71 # Read file based on its extension 72 if file_extension == '.xlsx': 73 phen_df = pd.read_excel(self.file, header=0, index_col=None) 74 elif file_extension == '.csv': 75 phen_df = pd.read_csv(self.file, sep=sep, header=header) 76 elif file_extension in ['.map', '.smap']: 77 phen_df = pd.read_csv(self.file, sep=sep, header=header) 78 elif file_extension == '.tsv': 79 phen_df = pd.read_csv(self.file, sep='\t') 80 elif file_extension == '.phen': 81 with open(self.file, 'r') as f: 82 contents = f.readlines() 83 # Convert .phen file content to a dictionary 84 phen_dict = {line.split()[0]: line.split()[1].strip() for line in contents[1:]} 85 phen_df = pd.DataFrame({'samples': list(phen_dict.keys()), 'phenotype': list(phen_dict.values())}) 86 else: 87 raise ValueError(f"Unsupported file extension {file_extension}. Supported extensions are: " 88 '[".xlsx", ".csv", ".tsv", ".map", ".smap", ".phen"]') 89 90 # Ensure the sample IDs column is labeled 'samples' 91 phen_df.rename(columns={phen_df.columns[samples_idx]: 'samples'}, inplace=True) 92 93 if samples_idx != 0: 94 # Reorder columns to place 'samples' as the first column 95 cols = ['samples'] + [col for col in phen_df.columns if col != 'samples'] 96 phen_df = phen_df[cols] 97 98 # Process phenotype columns if `phen_names` is provided 99 if phen_names is not None: 100 if drop: 101 # Drop columns not listed in `phen_names` or not the samples column 102 non_phen_columns = list(set(phen_df.columns) - set(['samples']+phen_names)) 103 phen_df = phen_df.drop(non_phen_columns, axis=1) 104 105 # Rename phenotype columns if length matches 106 phenotype_col_count = phen_df.shape[1] - 1 # Exclude samples column 107 if phenotype_col_count == len(phen_names): 108 phen_df.columns.values[1:] = phen_names 109 else: 110 raise ValueError(f"Mismatch between number of phenotype columns ({phenotype_col_count}) " 111 f"and length of `phen_names` ({len(phen_names)}).") 112 113 return MultiPhenotypeObject(phen_df=phen_df)
A class for reading data from a tabular file (.xlsx
, .csv
, .map
, .smap
, .phen
) and
constructing a MultiPhenotypeObject
.
MultiPhenTabularReader(file: Union[str, pathlib._local.Path])
19 def __init__(self, file: Union[str, Path]) -> None: 20 """ 21 Args: 22 file (str or pathlib.Path): 23 Path to the file containing phenotype data. Accepted formats = [`.xlsx`, `.csv`, `.map`, `.smap`, `.phen`]. 24 """ 25 self.__file = file
Arguments:
- file (str or pathlib.Path): Path to the file containing phenotype data. Accepted formats = [
.xlsx
,.csv
,.map
,.smap
,.phen
].
file: pathlib._local.Path
27 @property 28 def file(self) -> Path: 29 """ 30 Retrieve `file`. 31 32 Returns: 33 pathlib.Path: 34 Path to the file containing phenotype data. Accepted formats = [`.xlsx`, `.csv`, `.map`, `.smap`, `.phen`]. 35 """ 36 return self.__file
Retrieve file
.
Returns:
pathlib.Path: Path to the file containing phenotype data. Accepted formats = [
.xlsx
,.csv
,.map
,.smap
,.phen
].
def
read( self, samples_idx: int = 0, phen_names: Optional[List] = None, sep: str = ',', header: int = 0, drop: bool = False) -> snputils.phenotype.genobj.MultiPhenotypeObject:
38 def read( 39 self, 40 samples_idx: int = 0, 41 phen_names: Optional[List] = None, 42 sep: str = ',', 43 header: int = 0, 44 drop: bool = False 45 ) -> 'MultiPhenotypeObject': 46 """ 47 Read data from `file` and construct a `MultiPhenotypeObject`. 48 49 Args: 50 samples_idx (int, default=0): Index of the column containing sample identifiers. 51 Default is 0, assuming the first column contains sample identifiers. 52 phen_names (list of str, optional): List of phenotype column names. If provided, 53 these columns will be renamed to the specified names. 54 sep (str, default=','): The delimiter for separating values in `.csv`, `.tsv`, 55 or `.map` files. Default is ','. 56 header (int, default=0): Row index to use as the column names. By default, 57 uses the first row (`header=0`). Set to `None` if column names are provided 58 explicitly. 59 drop (bool, default=False): If True, removes columns not listed in `phen_names` 60 (except the samples column). 61 62 Returns: 63 MultiPhenotypeObject: 64 A multi-phenotype object instance. 65 """ 66 # Determine the file extension 67 file_extension = os.path.splitext(self.file)[1] 68 69 log.info(f"Reading '{file_extension}' file from '{self.file}'...") 70 71 # Read file based on its extension 72 if file_extension == '.xlsx': 73 phen_df = pd.read_excel(self.file, header=0, index_col=None) 74 elif file_extension == '.csv': 75 phen_df = pd.read_csv(self.file, sep=sep, header=header) 76 elif file_extension in ['.map', '.smap']: 77 phen_df = pd.read_csv(self.file, sep=sep, header=header) 78 elif file_extension == '.tsv': 79 phen_df = pd.read_csv(self.file, sep='\t') 80 elif file_extension == '.phen': 81 with open(self.file, 'r') as f: 82 contents = f.readlines() 83 # Convert .phen file content to a dictionary 84 phen_dict = {line.split()[0]: line.split()[1].strip() for line in contents[1:]} 85 phen_df = pd.DataFrame({'samples': list(phen_dict.keys()), 'phenotype': list(phen_dict.values())}) 86 else: 87 raise ValueError(f"Unsupported file extension {file_extension}. Supported extensions are: " 88 '[".xlsx", ".csv", ".tsv", ".map", ".smap", ".phen"]') 89 90 # Ensure the sample IDs column is labeled 'samples' 91 phen_df.rename(columns={phen_df.columns[samples_idx]: 'samples'}, inplace=True) 92 93 if samples_idx != 0: 94 # Reorder columns to place 'samples' as the first column 95 cols = ['samples'] + [col for col in phen_df.columns if col != 'samples'] 96 phen_df = phen_df[cols] 97 98 # Process phenotype columns if `phen_names` is provided 99 if phen_names is not None: 100 if drop: 101 # Drop columns not listed in `phen_names` or not the samples column 102 non_phen_columns = list(set(phen_df.columns) - set(['samples']+phen_names)) 103 phen_df = phen_df.drop(non_phen_columns, axis=1) 104 105 # Rename phenotype columns if length matches 106 phenotype_col_count = phen_df.shape[1] - 1 # Exclude samples column 107 if phenotype_col_count == len(phen_names): 108 phen_df.columns.values[1:] = phen_names 109 else: 110 raise ValueError(f"Mismatch between number of phenotype columns ({phenotype_col_count}) " 111 f"and length of `phen_names` ({len(phen_names)}).") 112 113 return MultiPhenotypeObject(phen_df=phen_df)
Read data from file
and construct a MultiPhenotypeObject
.
Arguments:
- samples_idx (int, default=0): Index of the column containing sample identifiers. Default is 0, assuming the first column contains sample identifiers.
- phen_names (list of str, optional): List of phenotype column names. If provided, these columns will be renamed to the specified names.
- sep (str, default=','): The delimiter for separating values in
.csv
,.tsv
, or.map
files. Default is ','. - header (int, default=0): Row index to use as the column names. By default,
uses the first row (
header=0
). Set toNone
if column names are provided explicitly. - drop (bool, default=False): If True, removes columns not listed in
phen_names
(except the samples column).
Returns:
MultiPhenotypeObject: A multi-phenotype object instance.