Source code for sctoolbox.tools.peak_annotation

"""Tools for peak annotation required by scATAC-seq."""
import pandas as pd
import numpy as np
import copy
import os
import multiprocessing as mp
import psutil
import subprocess
import scanpy as sc

from beartype.typing import Optional, Union, Tuple, Any, Literal
from beartype import beartype

import sctoolbox.utils as utils
import sctoolbox.utils.decorator as deco
from sctoolbox._settings import settings
logger = settings.logger


#################################################################################
# ------------------------ Uropa annotation of peaks -------------------------- #
#################################################################################

[docs] @deco.log_anndata @beartype def annotate_adata(adata: sc.AnnData, gtf: str, config: Optional[dict[str, Any]] = None, best: bool = True, threads: int = 1, coordinate_cols: Optional[list[str]] = None, temp_dir: str = "", remove_temp: bool = True, inplace: bool = True) -> Optional[sc.AnnData]: """ Annotate adata .var features with genes from .gtf using UROPA [1]_. The function assumes that the adata.var contains genomic coordinates in the first three columns, e.g. "chromosome", "start", "end". If specific columns should be used, please adjust the names via 'coordinate_cols'. Parameters ---------- adata : sc.AnnData The anndata object containing features to annotate. gtf : str Path to .gtf file containing genomic elements for annotation. config : Optional[dict[str, Any]], default None A dictionary indicating how regions should be annotated. Default (None) is to annotate feature 'gene' within -10000;1000bp of the gene start. See 'Examples' of how to set up a custom configuration dictionary. best : bool, default True Whether to return the best annotation or all valid annotations. threads : int, default 1 Number of threads to use for multiprocessing. coordinate_cols : Optional[list[str]], default None A list of column names in the regions DataFrame that contain the chromosome, start and end coordinates. If None the first three columns are taken. temp_dir : str, default '' Path to a directory to store files. Is only used if input .gtf-file needs sorting. remove_temp : boolean, default True If True remove temporary directory after execution. inplace : boolean, default True Whether to add the annotations to the adata object in place. Returns ------- Optional[sc.AnnData] If inplace == True, the annotation is added to adata.var in place. Else, a copy of the adata object is returned with the annotations added. References ---------- .. [1] Kondili M, Fust A, Preussner J, Kuenne C, Braun T, and Looso M. UROPA: a tool for Universal RObust Peak Annotation. Scientific Reports 7 (2017), doi: 10.1038/s41598-017-02464-y Examples -------- >>> custom_config = {"queries": [{"distance": [10000, 1000], "feature_anchor": "start", "feature": "gene"}], "priority": True, "show_attributes": "all"} >>> annotate_regions(adata, gtf="genes.gtf", config=custom_config) """ # Make temporary directory if needed utils.create_dir(temp_dir) # Check that packages are installed utils.check_module("uropa") # will raise an error if not installed import uropa.utils # TODO: Check input types # check_type(gtf, str, "gtf") # check_type(config, [type(None), dict], "config") # check_value(threads, vmin=1, name="threads") # Establish configuration dict logger.info("Setting up annotation configuration...") if config is None: cfg_dict = {"queries": [{"distance": [10000, 1000], "feature_anchor": "start", "feature": "gene", "name": "promoters"}], "priority": True, "show_attributes": "all"} else: cfg_dict = config cfg_dict = copy.deepcopy(cfg_dict) # make sure that config is not being changed in place uropa_logger = uropa.utils.UROPALogger() cfg_dict = uropa.utils.format_config(cfg_dict, logger=uropa_logger) logger.info("Config dictionary: {0}".format(cfg_dict)) # Read regions from .var table logger.info("Setting up genomic regions to annotate...") regions = adata.var.copy() # Establish columns for coordinates if coordinate_cols is None: coordinate_cols = adata.var.columns[:3] # first three columns are coordinates else: utils.check_columns(adata.var, coordinate_cols, name="coordinate_cols") # Check that coordinate_cols are in adata.var) # Test the coordinate columns utils.format_adata_var(adata, coordinate_cols, coordinate_cols) # will raise an error if not valid or try to convert from index # Convert regions to dict for uropa idx2name = {i: name for i, name in enumerate(regions.index)} regions.reset_index(inplace=True, drop=True) # drop index to get unique order for peak id region_dicts = [] for idx, row in regions.iterrows(): d = {"peak_chr": row[coordinate_cols[0]], "peak_start": int(row[coordinate_cols[1]]), "peak_end": int(row[coordinate_cols[2]]), "peak_id": idx} region_dicts.append(d) # Unzip, sort and index gtf if necessary gtf, tempfiles = _prepare_gtf(gtf, temp_dir) annotations_table = _annotate_features(region_dicts, threads, gtf, cfg_dict, best) # Preparation of adata.var update # Drop some columns already present drop_columns = ["peak_chr", "peak_start", "peak_end"] annotations_table.drop(columns=drop_columns, inplace=True) # Rename feat -> gene to prevent confusion with input features rename_dict = {c: c.replace("feat", "gene") for c in annotations_table.columns} rename_dict.update({'peak_id': '_peak_id', 'feature': 'annotation_feature', 'distance': 'distance_to_gene', 'relative_location': 'relative_location_to_gene', 'query': 'annotation_query'}) annotations_table.rename(columns=rename_dict, inplace=True) # Set columns as categorical cols = rename_dict.values() cols = [col for col in cols if col not in ["distance_to_gene", "gene_ovl_peak", "peak_ovl_gene"]] for col in cols: annotations_table[col] = annotations_table[col].astype('category') # Check if columns already exist existing = set(regions.columns).intersection(annotations_table.columns) if len(existing) > 0: logger.warning("WARNING: The following annotation columns already exist in adata.var: {0}".format(existing)) logger.info("These columns will be overwritten by the annotation") regions.drop(columns=existing, inplace=True) # Merge original sites with annotations regions_annotated = regions.merge(annotations_table, how="outer", left_index=True, right_on="_peak_id") regions_annotated.index = [idx2name[i] for i in regions_annotated["_peak_id"]] # put index name back into regions regions_annotated.drop(columns=["_peak_id"], inplace=True) # Duplicate peaks in adata if best is False if best is False: adata = adata[:, regions_annotated.index] # duplicates rows of adata.X and .var # Save var input object adata.var = regions_annotated logger.info("Finished annotation of features! The results are found in the .var table.") # Remove temporary directory if remove_temp: utils.rm_tmp(temp_dir, tempfiles) if inplace is False: return adata # else returns None
[docs] @beartype def annotate_narrowPeak(filepath: str, gtf: str, config: Optional[dict[str, Any]] = None, best: bool = True, threads: int = 1, temp_dir: str = "", remove_temp: bool = True) -> pd.DataFrame: """ Annotate narrowPeak files with genes from .gtf using UROPA. Parameters ---------- filepath : str Path to the narrowPeak file to be annotated. gtf : str Path to the .gtf file containing the genes to be annotated. config : Optional[dict[str, Any]], default None A dictionary indicating how regions should be annotated. Default (None) is to annotate feature 'gene' within -10000;1000bp of the gene start. See 'Examples' of how to set up a custom configuration dictionary. best : bool, default True Whether to return the best annotation or all valid annotations. threads : int, default 1 Number of threads to perform the annotation. temp_dir : str, default '' Path to the directory where the temporary files should be written. remove_temp : bool, default True If True remove temporary directory after execution. Returns ------- pd.DataFrame Dataframe containing the annotations. """ # Make temporary directory utils.create_dir(temp_dir) # Check that packages are installed utils.check_module("uropa") # will raise an error if not installed import uropa.utils # Establish configuration dict logger.info("Setting up annotation configuration...") if config is None: cfg_dict = {"queries": [{"distance": [10000, 1000], "feature_anchor": "start", "feature": "gene", "name": "promoters"}], "priority": True, "show_attributes": "all"} else: cfg_dict = config cfg_dict = copy.deepcopy(cfg_dict) # make sure that config is not being changed in place uropa_logger = uropa.utils.UROPALogger() cfg_dict = uropa.utils.format_config(cfg_dict, logger=uropa_logger) logger.info("Config dictionary: {0}".format(cfg_dict)) region_dicts = _load_narrowPeak(filepath) # Unzip, sort and index gtf if necessary gtf, tempfiles = _prepare_gtf(gtf, temp_dir) annotation_table = _annotate_features(region_dicts, threads, gtf, cfg_dict, best) logger.info("annotation done") # Remove temporary directory if remove_temp: utils.rm_tmp(temp_dir, tempfiles) return annotation_table
@beartype def _load_narrowPeak(filepath: str) -> list[dict[str, Union[str, int]]]: """ Load narrowPeak file to annotate. Parameters ---------- filepath : str Path to the narrowPeak file. Returns ------- list[dict[str, Union[str, int]]] List of dictionaries containing peak range information. """ logger.info("load regions_dict from: " + filepath) peaks = pd.read_csv(filepath, header=None, sep='\t') peaks = peaks.drop([3, 4, 5, 6, 7, 8, 9], axis=1) peaks.columns = ["peak_chr", "peak_start", "peak_end"] peaks["peak_id"] = peaks.index region_dicts = [] for i in range(len(peaks.index)): entry = peaks.iloc[i] dict = entry.to_dict() region_dicts.append(dict) return region_dicts @beartype def _prepare_gtf(gtf: str, temp_dir: str) -> Tuple[str, list[str]]: """ Prepare the .gtf file to use it in the annotation process. Therefore the file properties are checked and if necessary it is sorted, indexed and compressed. Parameters ---------- gtf : str Path to the .gtf file containing the genes to be annotated. temp_dir : str Path to the temporary directory for storing .gtf files. Returns ------- Tuple[str, list[str]] Index 1: str : Path to the gtf file to use in the annotation. Index 2: list : List of paths to temporary files created. Raises ------ ValueError: 1: If GTF-file could not be uncompressed. Whiel trying to sort the GTF-file. 2: If one subprocess during the sorting step fails. 3: If GTF-file could not be read. For example, due to an invalid format. """ utils.check_module("pysam") import pysam # input_gtf = gtf # Check integrity of the gtf file utils._gtf_integrity(gtf) # will raise an error if gtf is not valid tempfiles = [] # Prepare .gtf file in terms of index and sorting logger.info("Preparing gtf file for annotation...") success = 0 sort_done = 0 while success == 0: try: # try to open gtf with Tabix logger.info("- Reading gtf with Tabix") g = pysam.TabixFile(gtf) # gtf can be .gz or not g.close() success = 1 logger.info("Done preparing gtf!") except Exception: # if not possible, try to sort gtf logger.info("- Index of gtf not found - trying to index gtf") # First check if gtf was already gzipped if not utils._is_gz_file(gtf): base = os.path.basename(gtf) gtf_gz = os.path.join(temp_dir, base + ".gz") pysam.tabix_compress(gtf, gtf_gz, force=True) tempfiles.append(gtf_gz) gtf = gtf_gz # Try to index try: gtf_index = gtf + ".tbi" gtf = pysam.tabix_index(gtf, seq_col=0, start_col=3, end_col=4, keep_original=True, force=True, meta_char='#', index=gtf_index) tempfiles.append(gtf_index) except Exception: logger.info("- Indexing failed - the GTF is probably unsorted") # Start by uncompressing file if file is gz is_gz = utils._is_gz_file(gtf) if is_gz: gtf_uncompressed = os.path.join(temp_dir, "uncompressed.gtf") logger.info(f"- Uncompressing {gtf} to: {gtf_uncompressed}") try: utils.gunzip_file(gtf, gtf_uncompressed) except Exception: raise ValueError("Could not uncompress gtf file to sort. Please ensure that the input gtf is sorted.") gtf = gtf_uncompressed tempfiles.append(gtf_uncompressed) # Try to sort gtf if sort_done == 0: # make sure sort was not already performed gtf_sorted = os.path.join(temp_dir, "sorted.gtf") sort_call = "grep -v \"^#\" {0} | sort -k1,1 -k4,4n > {1}".format(gtf, gtf_sorted) logger.info("- Attempting to sort gtf with call: '{0}'".format(sort_call)) try: _ = subprocess.check_output(sort_call, shell=True) gtf = gtf_sorted # this gtf will now go to next loop in while sort_done = 1 except subprocess.CalledProcessError: raise ValueError("Could not sort gtf file using command-line call: {0}".format(sort_call)) else: raise ValueError("Could not read input gtf - please check for the correct format.") # Force close of gtf file left open; pysam issue 1038 proc = psutil.Process() for f in proc.open_files(): if f.path == os.path.abspath(gtf): os.close(f.fd) return gtf, tempfiles @beartype def _annotate_features(region_dicts: list[dict[Literal["peak_chr", "peak_start", "peak_end", "peak_id"], str | int]], threads: int, gtf: str, cfg_dict: Optional[dict[str, Union[list, bool, str, int, float]]], best: bool) -> pd.DataFrame: """ Annotate features. Parameters ---------- region_dicts : list[dict[Literal["peak_chr", "peak_start", "peak_end", "peak_id"], str | int]] List of dictionary with peak information. threads : int Number of threads to perform the annotation. gtf : str Path to the .gtf file cfg_dict : Optional[dict[str, Union[list, bool, str, int, float]]] A dictionary indicating how regions should be annotated. Set to None to annotate feature 'gene' within -10000;1000bp of the gene start. best : bool Whether to return the best annotation or all valid annotations. Returns ------- pd.DataFrame Dataframe with the annotation Examples -------- >>> region_dicts = [{"peak_chr": chr1, "peak_start": 100, "peak_end": 200, "peak_id": "peak1"}, {"peak_chr": chr1, "peak_start": 600, "peak_end": 800, "peak_id": "peak2"}] >>> cfg_dict = {"queries": [{"distance": [10000, 1000], "feature_anchor": "start", "feature": "gene"}], "priority": True, "show_attributes": "all"} """ # split input regions into cores n_reg = len(region_dicts) per_chunk = int(np.ceil(n_reg / float(threads))) region_dict_chunks = [region_dicts[i:i + per_chunk] for i in range(0, n_reg, per_chunk)] # calculate annotations for each chunk logger.info("Annotating regions...") annotations = [] if threads == 1: logger.info("NOTE: Increase --threads to speed up computation") for region_chunk in region_dict_chunks: chunk_annotations = _annotate_peaks_chunk(region_chunk, gtf, cfg_dict) annotations.extend(chunk_annotations) else: # Start multiprocessing pool pool = mp.Pool(threads) # Start job for each chunk jobs = [] for region_chunk in region_dict_chunks: job = pool.apply_async(_annotate_peaks_chunk, (region_chunk, gtf, cfg_dict, )) jobs.append(job) pool.close() # TODO: Print progress """ n_jobs = len(jobs) n_done = 0 #pbar = tqdm.tqdm(total=n_jobs) #pbar.set_description("Annotating regions") #while n_done < n_jobs: #n_current_done = sum([job.ready() for job in jobs]) #time.wait(1) #if n_current_done != n_done: #pbar.update() #n_done = n_current_done """ # Collect results: for job in jobs: chunk_annotations = job.get() annotations.extend(chunk_annotations) pool.join() logger.info("Formatting annotations...") # Select best annotations if best is True: annotations = [annotations[i] for i, anno in enumerate(annotations) if anno["best_hit"] == 1] # Extend feat_attributes per annotation and format for output del_keys = ["raw_distance", "anchor_pos", "peak_center", "peak_length", "feat_length", "feat_center"] for anno in annotations: if "feat_attributes" in anno: for key in anno["feat_attributes"]: anno[key] = anno["feat_attributes"][key] del anno["feat_attributes"] # remove certain keys for key in del_keys: if key in anno: del anno[key] # Remove best_hit column if best is True if best is True: del anno["best_hit"] # Convert any lists to string for key in anno: if isinstance(anno[key], list): anno[key] = anno[key][0] # Convert to pandas table annotations_table = pd.DataFrame(annotations) return annotations_table @beartype def _annotate_peaks_chunk(region_dicts: list[dict[Literal["peak_chr", "peak_start", "peak_end", "peak_id"], str | int]], gtf: str, cfg_dict: Optional[dict[str, Union[list, bool, str, int, float]]]) -> list[dict[str, Union[str, int]]]: """ Multiprocessing safe function to annotate a chunk of regions. Parameters ---------- region_dicts : list[dict[Literal["peak_chr", "peak_start", "peak_end", "peak_id"], str | int]] List of dictionaryies with peak information. gtf : str Path to the .gtf file cfg_dict : Optional[dict[str, Union[list, bool, str, int, float]]] A dictionary indicating how regions should be annotated. Returns ------- list[dict[str, Union[str, int]]] List of all valid annotations. """ import pysam import uropa from uropa.annotation import annotate_single_peak logger = uropa.utils.UROPALogger() # Open tabix file tabix_obj = pysam.TabixFile(gtf) # For each peak in input peaks, collect all_valid_annotations all_valid_annotations = [] for i, region in enumerate(region_dicts): # Annotate single peak valid_annotations = annotate_single_peak(region, tabix_obj, cfg_dict, logger=logger) all_valid_annotations.extend(valid_annotations) tabix_obj.close() return (all_valid_annotations)