"""General utility functions."""
import os
import re
import sys
from os.path import exists
import subprocess
import shutil
import getpass
from datetime import datetime
import numpy as np
import pandas as pd
# type hint imports
from beartype.typing import Any, TYPE_CHECKING, Optional, Union, Sequence
from beartype import beartype
import numpy.typing as npt
if TYPE_CHECKING:
import rpy2.rinterface_lib.sexp
# ------------------ Logging about run ----------------- #
[docs]
def get_user() -> str:
"""
Get the name of the current user.
Returns
-------
str
The name of the current user.
"""
try:
username = getpass.getuser()
except Exception:
username = "unknown"
return username
[docs]
def get_datetime() -> str:
"""
Get a string with the current date and time for logging.
Returns
-------
str
A string with the current date and time in the format dd/mm/YY H:M:S
"""
now = datetime.now()
dt_string = now.strftime("%d/%m/%Y %H:%M:%S") # dd/mm/YY H:M:S
return dt_string
# ------------------ Packages and tools ----------------- #
[docs]
def get_package_versions() -> dict[str, str]:
"""
Receive a dictionary of currently installed python packages and versions.
Returns
-------
dict[str, str]
A dict in the form:
`{"package1": "1.2.1", "package2":"4.0.1", (...)}`
"""
# Import freeze
try:
from pip._internal.operations import freeze
except ImportError: # pip < 10.0
from pip.operations import freeze
# Get list of packages and versions with freeze
package_list = freeze.freeze()
package_dict = {} # dict for collecting versions
for s in package_list:
try:
name, version = re.split("==| @ ", s)
package_dict[name] = version
except Exception:
print(f"Error reading version for package: {s}")
return package_dict
[docs]
@beartype
def get_binary_path(tool: str) -> str:
"""
Get path to a binary commandline tool.
Looks either in the local dir, on path or in the dir of the executing python binary.
Parameters
----------
tool : str
Name of the commandline tool to be found.
Returns
-------
str
Full path to the tool.
Raises
------
ValueError
If executable is not found.
"""
python_dir = os.path.dirname(sys.executable)
if os.path.exists(tool):
tool_path = f"./{tool}"
else:
# Check if tool is available on path
tool_path = shutil.which(tool)
if tool_path is None:
# Search for tool within same folder as python (e.g. in an environment)
python_dir = os.path.dirname(sys.executable)
tool_path = shutil.which(tool, path=python_dir)
# Check that tool is executable
if tool_path is None or shutil.which(tool_path) is None:
raise ValueError(f"Could not find an executable for {tool} on path.")
return tool_path
[docs]
def run_cmd(cmd: str) -> None:
"""
Run a commandline command.
Parameters
----------
cmd : str
Command to be run.
Raises
------
subprocess.CalledProcessError
If command has an error.
"""
try:
subprocess.check_call(cmd, shell=True)
print(f"Command '{cmd}' ran successfully!")
except subprocess.CalledProcessError as e:
# print(f"Error running command '{cmd}': {e}")
if e.output is not None:
print(f"Command standard output: {e.output.decode('utf-8')}")
if e.stderr is not None:
print(f"Command standard error: {e.stderr.decode('utf-8')}")
raise e
#####################################################################
# R setup #
#####################################################################
[docs]
def setup_R(r_home: Optional[str] = None) -> None:
"""
Add R installation for rpy2 use.
Parameters
----------
r_home : Optional[str], default None
Path to the R home directory. If None will construct path based on location of python executable.
E.g for ".conda/scanpy/bin/python" will look at ".conda/scanpy/lib/R"
Raises
------
Exception
If path to R is invalid.
"""
# Set R installation path
if not r_home:
r_home = os.path.join(sys.executable.split('/bin/')[0], 'lib/R')
if not exists(r_home):
raise Exception(f'Path to R installation does not exist! Make sure R is installed. {r_home}')
from rpy2.rinterface_lib import openrlib
os.environ['R_HOME'] = r_home
openrlib.R_HOME = r_home
@beartype
def _none2null(none_obj: None) -> "rpy2.rinterface_lib.sexp.NULLType":
"""
rpy2 converter that translates python 'None' to R 'NULL'.
Intended to be added as a rpy2 converter object.
Parameters
----------
none_obj : None
None object to convert to r"NULL".
Returns
-------
rpy2.rinterface_lib.sexp.NULLType
R NULL object.
"""
# See https://stackoverflow.com/questions/65783033/how-to-convert-none-to-r-null
from rpy2.robjects import r
return r("NULL")
# ----------------- List functions ---------------- #
[docs]
@beartype
def split_list(lst: Sequence[Any], n: int) -> list[Sequence[Any]]:
"""
Split list into n chunks.
Parameters
----------
lst : Sequence[Any]
Sequence to be chunked
n : int
Number of chunks.
Returns
-------
list[Sequence[Any]]
List of Sequences (chunks).
"""
chunks = []
for i in range(0, n):
chunks.append(lst[i::n])
return chunks
[docs]
@beartype
def split_list_size(lst: list[Any], max_size: int) -> list[list[Any]]:
"""
Split list into chunks of max_size.
Parameters
----------
lst : list[Any]
List to be chunked
max_size : int
Max size of chunks.
Returns
-------
list[list[Any]]
List of lists (chunks).
"""
chunks = []
for i in range(0, len(lst), max_size):
chunks.append(lst[i:i + max_size])
return chunks
[docs]
@beartype
def write_list_file(lst: list[Any], path: str) -> None:
"""
Write a list to a file with one element per line.
Parameters
----------
lst : list[Any]
A list of values/strings to write to file
path : str
Path to output file.
"""
lst = [str(s) for s in lst]
s = "\n".join(lst)
with open(path, "w") as f:
f.write(s)
[docs]
def read_list_file(path: str) -> list[str]:
"""
Read a list from a file with one element per line.
Parameters
----------
path : str
Path to read file from.
Returns
-------
list[str]
List of strings read from file.
"""
f = open(path)
lst = f.read().splitlines() # get lines without "\n"
f.close()
return lst
# ----------------- String functions ---------------- #
[docs]
@beartype
def clean_flanking_strings(list_of_strings: list[str]) -> list[str]:
"""
Remove common suffix and prefix from a list of strings.
E.g. running the function on ['path/a.txt', 'path/b.txt', 'path/c.txt'] would yield ['a', 'b', 'c'].
Parameters
----------
list_of_strings : list[str]
List of strings.
Returns
-------
list[str]
List of strings without common suffix and prefix
"""
suffix = longest_common_suffix(list_of_strings)
prefix = os.path.commonprefix(list_of_strings)
list_of_strings_clean = [remove_prefix(s, prefix) for s in list_of_strings]
list_of_strings_clean = [remove_suffix(s, suffix) for s in list_of_strings_clean]
return list_of_strings_clean
[docs]
@beartype
def longest_common_suffix(list_of_strings: list[str]) -> str:
"""
Find the longest common suffix of a list of strings.
Parameters
----------
list_of_strings : list[str]
List of strings.
Returns
-------
str
Longest common suffix of the list of strings.
"""
reversed_strings = [s[::-1] for s in list_of_strings]
reversed_lcs = os.path.commonprefix(reversed_strings)
lcs = reversed_lcs[::-1]
return lcs
[docs]
def remove_prefix(s: str, prefix: str) -> str:
"""
Remove prefix from a string.
Parameters
----------
s : str
String to be processed.
prefix : str
Prefix to be removed.
Returns
-------
str
String without prefix.
"""
return s[len(prefix):] if s.startswith(prefix) else s
[docs]
def remove_suffix(s: str, suffix: str) -> str:
"""
Remove suffix from a string.
Parameters
----------
s : str
String to be processed.
suffix : str
Suffix to be removed.
Returns
-------
str
String without suffix.
"""
return s[:-len(suffix)] if s.endswith(suffix) else s
[docs]
@beartype
def sanitize_string(s: str, char_list: list[str], replace: str = "_") -> str:
"""
Replace every occurrence of given substrings.
Parameters
----------
s : str
String to sanitize
char_list : list[str]
Strings that should be replaced.
replace : str, default "_"
Replacement of substrings.
Returns
-------
str
Sanitized string.
"""
for char in char_list:
s = s.replace(char, replace)
return s
[docs]
@beartype
def identify_columns(df: pd.DataFrame,
regex: Union[list[str], str]) -> list[str]:
"""
Get columns from pd.DataFrame that match the given regex.
Parameters
----------
df : pd.DataFrame
Pandas dataframe to be checked.
regex : Union(list[str], str)
List of multiple regex or one regex as string.
Returns
-------
list[str]
List of column names that match one of the provided regex.
"""
if isinstance(regex, list):
regex = "(" + ")|(".join(regex) + ")"
return df.filter(regex=(regex)).columns.to_list()
[docs]
@beartype
def scale_values(array: npt.ArrayLike, mini: int | float, maxi: int | float) -> np.ndarray:
"""Small utility to scale values in array to a given range.
Parameters
----------
array : npt.ArrayLike
Array to scale.
mini : int | float
Minimum value of the scale.
maxi : int | float
Maximum value of the scale.
Returns
-------
np.ndarray
Scaled array values.
"""
val_range = array.max() - array.min()
a = (array - array.min()) / val_range
return a * (maxi - mini) + mini