snputils.phenotype.io

1from .read import UKBPhenReader
2from .read import MultiPhenTabularReader
3
4__all__ = ['UKBPhenReader', 'MultiPhenTabularReader']
class UKBPhenReader(snputils.phenotype.io.read.base.PhenotypeBaseReader):
13class UKBPhenReader(PhenotypeBaseReader):
14    """
15    A class for reading data from a `.phe` file and constructing a `UKBPhenotypeObject`.
16    """
17    def __init__(self, file: Union[str, Path]) -> None:
18        """
19        Args:
20            file (str or pathlib.Path): 
21                Path to the `.phe` file containing UKB phenotype data.
22        """
23        self._file = file
24
25    @property
26    def file(self) -> Path:
27        """
28        Retrieve `file`.
29
30        Returns:
31            pathlib.Path: 
32                Path to the `.phe` file containing UKB phenotype data.
33        """
34        return self.__file
35    
36    def read(self) -> 'UKBPhenotypeObject':
37        """
38        Read data from a `.phe ` file and construct a `UKBPhenotypeObject`.
39
40        Returns:
41            UKBPhenotypeObject: 
42                A UKB phenotype object instance.
43        """
44        log.info(f"Reading .phe file from '{self.file}'...")
45        
46        # Load the `.phe` file with specified column names and data types
47        phen_df = pd.read_csv(
48            self.file,
49            header=None,
50            delim_whitespace=True,
51            names=["FID", "IID", "status"],
52            dtype={"FID": str, "IID": str, "status": int},
53        )
54        
55        # Extract sample IDs for cases based on status
56        cases_IDs = list(phen_df[phen_df["status"] == 2]["FID"])
57        n_cases = len(cases_IDs)
58        if n_cases == 0:
59            raise ValueError("No case data available.")
60        
61        # Extract sample IDs for controls based on status
62        controls_IDs = list(phen_df[phen_df["status"] == 1]["FID"])
63        n_controls = len(controls_IDs)
64        if n_controls == 0:
65            raise ValueError("No control data available.")
66        
67        # Verify the sample count integrity
68        sample_IDs = phen_df["FID"].tolist()
69        n_samples = len(sample_IDs)
70        if n_samples != (n_cases + n_controls):
71            raise ValueError(
72                "Total sample count does not match the combined count of cases and controls. "
73                f"Expected {n_cases + n_controls}; found {n_samples}."
74            )
75        
76        # Generate haplotypes for cases, controls, and all samples
77        cases_haplotypes = [f"{case}.0" for case in cases_IDs] + [f"{case}.1" for case in cases_IDs]
78        controls_haplotypes = [f"{control}.0" for control in controls_IDs] + [f"{control}.1" for control in controls_IDs]
79        all_haplotypes = [f"{sample}.0" for sample in sample_IDs] + [f"{sample}.1" for sample in sample_IDs]
80        
81        return UKBPhenotypeObject(samples = sample_IDs,
82                                  n_samples = n_samples,
83                                  cases = cases_IDs,
84                                  n_cases = n_cases,
85                                  controls = controls_IDs,
86                                  n_controls = n_controls,
87                                  all_haplotypes = all_haplotypes,
88                                  cases_haplotypes = cases_haplotypes,
89                                  controls_haplotypes = controls_haplotypes
90                                 )

A class for reading data from a .phe file and constructing a UKBPhenotypeObject.

UKBPhenReader(file: Union[str, pathlib._local.Path])
17    def __init__(self, file: Union[str, Path]) -> None:
18        """
19        Args:
20            file (str or pathlib.Path): 
21                Path to the `.phe` file containing UKB phenotype data.
22        """
23        self._file = file
Arguments:
  • file (str or pathlib.Path): Path to the .phe file containing UKB phenotype data.
file: pathlib._local.Path
25    @property
26    def file(self) -> Path:
27        """
28        Retrieve `file`.
29
30        Returns:
31            pathlib.Path: 
32                Path to the `.phe` file containing UKB phenotype data.
33        """
34        return self.__file

Retrieve file.

Returns:

pathlib.Path: Path to the .phe file containing UKB phenotype data.

def read(self) -> snputils.phenotype.genobj.UKBPhenotypeObject:
36    def read(self) -> 'UKBPhenotypeObject':
37        """
38        Read data from a `.phe ` file and construct a `UKBPhenotypeObject`.
39
40        Returns:
41            UKBPhenotypeObject: 
42                A UKB phenotype object instance.
43        """
44        log.info(f"Reading .phe file from '{self.file}'...")
45        
46        # Load the `.phe` file with specified column names and data types
47        phen_df = pd.read_csv(
48            self.file,
49            header=None,
50            delim_whitespace=True,
51            names=["FID", "IID", "status"],
52            dtype={"FID": str, "IID": str, "status": int},
53        )
54        
55        # Extract sample IDs for cases based on status
56        cases_IDs = list(phen_df[phen_df["status"] == 2]["FID"])
57        n_cases = len(cases_IDs)
58        if n_cases == 0:
59            raise ValueError("No case data available.")
60        
61        # Extract sample IDs for controls based on status
62        controls_IDs = list(phen_df[phen_df["status"] == 1]["FID"])
63        n_controls = len(controls_IDs)
64        if n_controls == 0:
65            raise ValueError("No control data available.")
66        
67        # Verify the sample count integrity
68        sample_IDs = phen_df["FID"].tolist()
69        n_samples = len(sample_IDs)
70        if n_samples != (n_cases + n_controls):
71            raise ValueError(
72                "Total sample count does not match the combined count of cases and controls. "
73                f"Expected {n_cases + n_controls}; found {n_samples}."
74            )
75        
76        # Generate haplotypes for cases, controls, and all samples
77        cases_haplotypes = [f"{case}.0" for case in cases_IDs] + [f"{case}.1" for case in cases_IDs]
78        controls_haplotypes = [f"{control}.0" for control in controls_IDs] + [f"{control}.1" for control in controls_IDs]
79        all_haplotypes = [f"{sample}.0" for sample in sample_IDs] + [f"{sample}.1" for sample in sample_IDs]
80        
81        return UKBPhenotypeObject(samples = sample_IDs,
82                                  n_samples = n_samples,
83                                  cases = cases_IDs,
84                                  n_cases = n_cases,
85                                  controls = controls_IDs,
86                                  n_controls = n_controls,
87                                  all_haplotypes = all_haplotypes,
88                                  cases_haplotypes = cases_haplotypes,
89                                  controls_haplotypes = controls_haplotypes
90                                 )

Read data from a .phe file and construct a UKBPhenotypeObject.

Returns:

UKBPhenotypeObject: A UKB phenotype object instance.

class MultiPhenTabularReader(snputils.phenotype.io.read.base.PhenotypeBaseReader):
 14class MultiPhenTabularReader(PhenotypeBaseReader):
 15    """
 16    A class for reading data from a tabular file (`.xlsx`, `.csv`, `.map`, `.smap`, `.phen`) and 
 17    constructing a `MultiPhenotypeObject`.
 18    """
 19    def __init__(self, file: Union[str, Path]) -> None:
 20        """
 21        Args:
 22            file (str or pathlib.Path): 
 23                Path to the file containing phenotype data. Accepted formats = [`.xlsx`, `.csv`, `.map`, `.smap`, `.phen`].
 24        """
 25        self.__file = file
 26
 27    @property
 28    def file(self) -> Path:
 29        """
 30        Retrieve `file`.
 31
 32        Returns:
 33            pathlib.Path: 
 34                Path to the file containing phenotype data. Accepted formats = [`.xlsx`, `.csv`, `.map`, `.smap`, `.phen`].
 35        """
 36        return self.__file
 37    
 38    def read(
 39            self, 
 40            samples_idx: int = 0, 
 41            phen_names: Optional[List] = None, 
 42            sep: str = ',', 
 43            header: int = 0, 
 44            drop: bool = False
 45        ) -> 'MultiPhenotypeObject':
 46        """
 47        Read data from `file` and construct a `MultiPhenotypeObject`.
 48
 49        Args:
 50            samples_idx (int, default=0): Index of the column containing sample identifiers.
 51                Default is 0, assuming the first column contains sample identifiers.
 52            phen_names (list of str, optional): List of phenotype column names. If provided, 
 53                these columns will be renamed to the specified names.
 54            sep (str, default=','): The delimiter for separating values in `.csv`, `.tsv`, 
 55                or `.map` files. Default is ','.
 56            header (int, default=0): Row index to use as the column names. By default, 
 57                uses the first row (`header=0`). Set to `None` if column names are provided 
 58                explicitly.
 59            drop (bool, default=False): If True, removes columns not listed in `phen_names` 
 60                (except the samples column).
 61
 62        Returns:
 63            MultiPhenotypeObject: 
 64                A multi-phenotype object instance.
 65        """
 66        # Determine the file extension
 67        file_extension = os.path.splitext(self.file)[1]
 68        
 69        log.info(f"Reading '{file_extension}' file from '{self.file}'...")
 70        
 71        # Read file based on its extension
 72        if file_extension == '.xlsx': 
 73            phen_df = pd.read_excel(self.file, header=0, index_col=None)
 74        elif file_extension == '.csv':
 75            phen_df = pd.read_csv(self.file, sep=sep, header=header)
 76        elif file_extension in ['.map', '.smap']:
 77            phen_df = pd.read_csv(self.file, sep=sep, header=header)
 78        elif file_extension == '.tsv':
 79            phen_df = pd.read_csv(self.file, sep='\t')
 80        elif file_extension == '.phen':
 81            with open(self.file, 'r') as f:
 82                contents = f.readlines()
 83            # Convert .phen file content to a dictionary
 84            phen_dict = {line.split()[0]: line.split()[1].strip() for line in contents[1:]}
 85            phen_df = pd.DataFrame({'samples': list(phen_dict.keys()), 'phenotype': list(phen_dict.values())})        
 86        else:
 87            raise ValueError(f"Unsupported file extension {file_extension}. Supported extensions are: "
 88                             '[".xlsx", ".csv", ".tsv", ".map", ".smap", ".phen"]')
 89        
 90        # Ensure the sample IDs column is labeled 'samples'
 91        phen_df.rename(columns={phen_df.columns[samples_idx]: 'samples'}, inplace=True)
 92
 93        if samples_idx != 0:
 94            # Reorder columns to place 'samples' as the first column
 95            cols = ['samples'] + [col for col in phen_df.columns if col != 'samples']
 96            phen_df = phen_df[cols]
 97        
 98        # Process phenotype columns if `phen_names` is provided
 99        if phen_names is not None:
100            if drop:
101                # Drop columns not listed in `phen_names` or not the samples column
102                non_phen_columns = list(set(phen_df.columns) - set(['samples']+phen_names))
103                phen_df = phen_df.drop(non_phen_columns, axis=1)
104            
105            # Rename phenotype columns if length matches
106            phenotype_col_count = phen_df.shape[1] - 1  # Exclude samples column
107            if phenotype_col_count == len(phen_names):
108                phen_df.columns.values[1:] = phen_names
109            else:
110                raise ValueError(f"Mismatch between number of phenotype columns ({phenotype_col_count}) "
111                                 f"and length of `phen_names` ({len(phen_names)}).")
112        
113        return MultiPhenotypeObject(phen_df=phen_df)

A class for reading data from a tabular file (.xlsx, .csv, .map, .smap, .phen) and constructing a MultiPhenotypeObject.

MultiPhenTabularReader(file: Union[str, pathlib._local.Path])
19    def __init__(self, file: Union[str, Path]) -> None:
20        """
21        Args:
22            file (str or pathlib.Path): 
23                Path to the file containing phenotype data. Accepted formats = [`.xlsx`, `.csv`, `.map`, `.smap`, `.phen`].
24        """
25        self.__file = file
Arguments:
  • file (str or pathlib.Path): Path to the file containing phenotype data. Accepted formats = [.xlsx, .csv, .map, .smap, .phen].
file: pathlib._local.Path
27    @property
28    def file(self) -> Path:
29        """
30        Retrieve `file`.
31
32        Returns:
33            pathlib.Path: 
34                Path to the file containing phenotype data. Accepted formats = [`.xlsx`, `.csv`, `.map`, `.smap`, `.phen`].
35        """
36        return self.__file

Retrieve file.

Returns:

pathlib.Path: Path to the file containing phenotype data. Accepted formats = [.xlsx, .csv, .map, .smap, .phen].

def read( self, samples_idx: int = 0, phen_names: Optional[List] = None, sep: str = ',', header: int = 0, drop: bool = False) -> snputils.phenotype.genobj.MultiPhenotypeObject:
 38    def read(
 39            self, 
 40            samples_idx: int = 0, 
 41            phen_names: Optional[List] = None, 
 42            sep: str = ',', 
 43            header: int = 0, 
 44            drop: bool = False
 45        ) -> 'MultiPhenotypeObject':
 46        """
 47        Read data from `file` and construct a `MultiPhenotypeObject`.
 48
 49        Args:
 50            samples_idx (int, default=0): Index of the column containing sample identifiers.
 51                Default is 0, assuming the first column contains sample identifiers.
 52            phen_names (list of str, optional): List of phenotype column names. If provided, 
 53                these columns will be renamed to the specified names.
 54            sep (str, default=','): The delimiter for separating values in `.csv`, `.tsv`, 
 55                or `.map` files. Default is ','.
 56            header (int, default=0): Row index to use as the column names. By default, 
 57                uses the first row (`header=0`). Set to `None` if column names are provided 
 58                explicitly.
 59            drop (bool, default=False): If True, removes columns not listed in `phen_names` 
 60                (except the samples column).
 61
 62        Returns:
 63            MultiPhenotypeObject: 
 64                A multi-phenotype object instance.
 65        """
 66        # Determine the file extension
 67        file_extension = os.path.splitext(self.file)[1]
 68        
 69        log.info(f"Reading '{file_extension}' file from '{self.file}'...")
 70        
 71        # Read file based on its extension
 72        if file_extension == '.xlsx': 
 73            phen_df = pd.read_excel(self.file, header=0, index_col=None)
 74        elif file_extension == '.csv':
 75            phen_df = pd.read_csv(self.file, sep=sep, header=header)
 76        elif file_extension in ['.map', '.smap']:
 77            phen_df = pd.read_csv(self.file, sep=sep, header=header)
 78        elif file_extension == '.tsv':
 79            phen_df = pd.read_csv(self.file, sep='\t')
 80        elif file_extension == '.phen':
 81            with open(self.file, 'r') as f:
 82                contents = f.readlines()
 83            # Convert .phen file content to a dictionary
 84            phen_dict = {line.split()[0]: line.split()[1].strip() for line in contents[1:]}
 85            phen_df = pd.DataFrame({'samples': list(phen_dict.keys()), 'phenotype': list(phen_dict.values())})        
 86        else:
 87            raise ValueError(f"Unsupported file extension {file_extension}. Supported extensions are: "
 88                             '[".xlsx", ".csv", ".tsv", ".map", ".smap", ".phen"]')
 89        
 90        # Ensure the sample IDs column is labeled 'samples'
 91        phen_df.rename(columns={phen_df.columns[samples_idx]: 'samples'}, inplace=True)
 92
 93        if samples_idx != 0:
 94            # Reorder columns to place 'samples' as the first column
 95            cols = ['samples'] + [col for col in phen_df.columns if col != 'samples']
 96            phen_df = phen_df[cols]
 97        
 98        # Process phenotype columns if `phen_names` is provided
 99        if phen_names is not None:
100            if drop:
101                # Drop columns not listed in `phen_names` or not the samples column
102                non_phen_columns = list(set(phen_df.columns) - set(['samples']+phen_names))
103                phen_df = phen_df.drop(non_phen_columns, axis=1)
104            
105            # Rename phenotype columns if length matches
106            phenotype_col_count = phen_df.shape[1] - 1  # Exclude samples column
107            if phenotype_col_count == len(phen_names):
108                phen_df.columns.values[1:] = phen_names
109            else:
110                raise ValueError(f"Mismatch between number of phenotype columns ({phenotype_col_count}) "
111                                 f"and length of `phen_names` ({len(phen_names)}).")
112        
113        return MultiPhenotypeObject(phen_df=phen_df)

Read data from file and construct a MultiPhenotypeObject.

Arguments:
  • samples_idx (int, default=0): Index of the column containing sample identifiers. Default is 0, assuming the first column contains sample identifiers.
  • phen_names (list of str, optional): List of phenotype column names. If provided, these columns will be renamed to the specified names.
  • sep (str, default=','): The delimiter for separating values in .csv, .tsv, or .map files. Default is ','.
  • header (int, default=0): Row index to use as the column names. By default, uses the first row (header=0). Set to None if column names are provided explicitly.
  • drop (bool, default=False): If True, removes columns not listed in phen_names (except the samples column).
Returns:

MultiPhenotypeObject: A multi-phenotype object instance.