"""Module to calculate TSS enrichment scores."""
import scanpy as sc
import pysam
import os
import sys
import numpy as np
import pandas as pd
from tqdm import tqdm
import sctoolbox.tools as tools
import sctoolbox.utils as utils
import sctoolbox.utils.decorator as deco
from sctoolbox._settings import settings
from beartype.typing import Tuple, Optional
from beartype import beartype
import numpy.typing as npt
logger = settings.logger
[docs]
@beartype
def write_TSS_bed(gtf: str,
custom_TSS: str,
negativ_shift: int = 2000,
positiv_shift: int = 2000,
temp_dir: Optional[str] = None) -> Tuple[list, list]:
"""
Write a custom TSS file from a gene gtf file.
negativ_shift and positiv_shift are the number of bases of the flanks.
The output is a bed file with the following columns:
1: chr, 2: start, 3:stop
and a list of lists with the following columns:
1: chr, 2: start, 3:stop
Parameters
----------
gtf : str
path to gtf file
custom_TSS : str
path to output file
negativ_shift : int, default 2000
number of bases to shift upstream
positiv_shift : int, default 2000
number of bases to shift downstream
temp_dir : Optional[str], default None
path to temporary directory
Returns
-------
Tuple[list, list]
list of lists with the following columns:
1: chr, 2: start, 3:stop
"""
# Unzip, sort and index gtf if necessary
gtf, tempfiles = tools._prepare_gtf(gtf, temp_dir)
# Open tabix file
tabix_obj = pysam.TabixFile(gtf)
# Create list of TSS
tss_list = []
# Write TSS to file and list
logger.info("building temporary TSS file")
with open(custom_TSS, "w") as out_file:
for gene in tabix_obj.fetch(parser=pysam.asGTF()):
tss_list.append([gene.contig, gene.start - negativ_shift, gene.start + positiv_shift])
line = str(gene.contig) + '\t' + str(max(0, gene.start - negativ_shift)) + '\t' + str(
gene.start + positiv_shift) + '\n'
out_file.write(line)
return tss_list, tempfiles
[docs]
@beartype
def overlap_and_aggregate(fragments: str,
custom_TSS: str,
overlap: str,
tss_list: list[list[str | int]],
negativ_shift: int = 2000,
positiv_shift: int = 2000) -> Tuple[dict[str, list], list[str]]:
"""
Overlap the fragments with the custom TSS file and aggregates the fragments in a dictionary.
The dictionary has the following structure:
{barcode: [tss_agg, n_fragments]}
tss_agg is a numpy array with the aggregated fragments around the TSS with flanks of negativ_shift and positiv_shift.
n_fragments is the number of fragments overlapping the TSS.
Parameters
----------
fragments : str
path to fragments file
custom_TSS : str
path to custom TSS file
overlap : str
path to output file
tss_list : list[list[str | int]]
list of lists with the following columns:
1: chr, 2: start, 3:stop
negativ_shift : int, default 2000
number of bases to shift upstream
positiv_shift : int, default 2000
number of bases to shift downstream
Returns
-------
Tuple[dict[str, list], list[str]]
dictionary with the following structure:
{barcode: [tss_agg, n_fragments]}
tss_agg is a numpy array with the aggregated fragments around the TSS with flanks of negativ_shift and positiv_shift.
n_fragments is the number of fragments overlapping the TSS.
list[str] contains a list of temporary files.
"""
tempfiles = []
# Check if fragments are sorted
if utils._bed_is_sorted(fragments):
logger.info("fragments are sorted")
else:
logger.info("sorting fragments")
# build path to sorted fragments
base_name = os.path.basename(fragments)
file_name = os.path.splitext(base_name)[0]
sorted_bedfile = os.path.join(os.path.split(fragments)[0], (file_name + '_sorted.bed'))
tempfiles.append(sorted_bedfile)
# sort fragments
utils._sort_bed(fragments, sorted_bedfile)
# sorted fragments to be used for overlap
fragments = sorted_bedfile
# Overlap two bedfiles (FRAGMENTS / TSS_sites)
logger.info("overlapping fragments with TSS")
bedtools = os.path.join('/'.join(sys.executable.split('/')[:-1]), 'bedtools')
intersect_cmd = f'{bedtools} intersect -a {fragments} -b {custom_TSS} -u -sorted > {overlap}'
# run command
os.system(intersect_cmd)
# Read in overlap file
logger.info("opening overlap file")
overlap_list = utils._read_bedfile(overlap)
# initialize dictionary
tSSe_cells = {}
# initialize counter for overlap_list
k = 0
# Aggregate Overlap
logger.info("aggregating fragments")
for tss in tqdm(tss_list, desc='Aggregating'):
for i in range(k, len(overlap_list)):
fragment = overlap_list[i]
# update counter
k = i
# check if fragment is on the same chromosome and start of fragment is smaller than end of tss
overlap_condition = (fragment[0] == tss[0]) & \
(fragment[1] <= tss[2])
# break if not overlap_condition
if not overlap_condition:
break
# calculate start and stop of the overlap
start = max(0, fragment[1] - tss[1])
stop = min(-1, fragment[2] - tss[2])
# calculate number of fragments
n_fragments = fragment[4]
# check if barcode is already in dictionary. If yes, update tss_agg and n_fragments
if fragment[3] in tSSe_cells:
new_tss_agg = tSSe_cells[fragment[3]][0]
new_tss_agg[start:stop] = tSSe_cells[fragment[3]][0][start:stop] + n_fragments
new_count = tSSe_cells[fragment[3]][1] + n_fragments
tSSe_cells[fragment[3]] = [new_tss_agg, new_count]
# if not, create new entry in dictionary
else:
tSS_agg = np.zeros(negativ_shift + positiv_shift, dtype=int)
tSS_agg[start:stop] = n_fragments
tSSe_cells[fragment[3]] = [tSS_agg, n_fragments]
return tSSe_cells, tempfiles
[docs]
@beartype
def calc_per_base_tsse(tSSe_df: pd.DataFrame,
min_bias: float = 0.01,
edge_size: int = 100) -> npt.ArrayLike:
"""
Calculate per base tSSe by dividing the tSSe by the bias. The bias is calculated by averaging the edges of the tSSe.
The edges are defined by the edge_size.
Parameters
----------
tSSe_df : pd.DataFrame
dataframe with the following columns:
1: barcode, 2: tSSe, 3: n_fragments
min_bias : float, default 0.01
minimum bias to avoid division by zero
edge_size : int, default 100
number of bases to use for the edges
Returns
-------
npt.ArrayLike
numpy array with the per base tSSe
"""
# make np.array for calculation
tSS_agg_arr = np.array(tSSe_df['TSS_agg'].to_list())
# get the edges as bias
logger.info("calculating bias")
border_upstream = tSS_agg_arr[:, 0:edge_size]
border_downstream = tSS_agg_arr[:, -edge_size:]
# Concatenate along the second axis
edges = np.concatenate((border_upstream, border_downstream), axis=1)
# calculate bias
bias = np.sum(edges, axis=1) / 200
bias[bias == 0] = min_bias # Can I do this?
# calculate per base tSSe
logger.info("calculating per base tSSe")
per_base_tsse = tSS_agg_arr / bias[:, None]
return per_base_tsse
[docs]
@beartype
def global_tsse_score(per_base_tsse: npt.ArrayLike,
negativ_shift: int,
edge_size: int = 50) -> npt.ArrayLike:
"""
Calculate the global tSSe score by averaging the per base tSSe around the TSS.
Parameters
----------
per_base_tsse : npt.ArrayLike
numpy array with the per base tSSe
negativ_shift : int
number of bases to shift upstream
edge_size : int, default 50
number of bases to use for the edges
Returns
-------
npt.ArrayLike
numpy array with the global tSSe score
"""
# calculate global tSSe score
logger.info("calculating global tSSe score")
center = per_base_tsse[:, negativ_shift - edge_size:negativ_shift + edge_size]
global_tsse_score = np.sum(center, axis=1) / (edge_size * 2)
return global_tsse_score
[docs]
@beartype
def tsse_scoring(fragments: str,
gtf: str,
negativ_shift: int = 2000,
positiv_shift: int = 2000,
edge_size_total: int = 100,
edge_size_per_base: int = 50,
min_bias: float = 0.01,
keep_tmp: bool = False,
temp_dir: str = "") -> pd.DataFrame:
"""
Calculate the tSSe score for each cell.
Calculating the TSSe score is done like described in: "Chromatin accessibility profiling by ATAC-seq" Fiorella et al. 2022
Parameters
----------
fragments : str
path to fragments file
gtf : str
path to gtf file
negativ_shift : int, default 2000
number of bases to shift upstream
positiv_shift : int, default 2000
number of bases to shift downstream
edge_size_total : int, default 100
number of bases to use for the edges for the global tSSe score
edge_size_per_base : int, default 50
number of bases to use for the edges for the per base tSSe score
min_bias : float, default 0.01
minimum bias to avoid division by zero
keep_tmp : bool, default False
keep temporary files
temp_dir : str, default ""
path to temporary directory
Returns
-------
pd.DataFrame
dataframe with the following columns:
1: barcode, 2: tSSe, 3: n_fragments
"""
tmp_files = []
# create temporary file paths
custom_TSS = os.path.join(temp_dir, "custom_TSS.bed")
overlap = os.path.join(temp_dir, "overlap.bed")
# add temporary file paths to list
tmp_files.append(custom_TSS)
tmp_files.append(overlap)
# write custom TSS bed file
tss_list, temp = write_TSS_bed(gtf, custom_TSS, negativ_shift=negativ_shift, positiv_shift=positiv_shift,
temp_dir=temp_dir)
# add temporary file paths to list
tmp_files.extend(temp)
# overlap fragments with custom TSS
tSSe_cells, temp = overlap_and_aggregate(fragments, custom_TSS, overlap, tss_list)
# add temporary file paths to list
tmp_files.extend(temp)
# calculate per base tSSe
tSSe_df = pd.DataFrame.from_dict(tSSe_cells, orient='index', columns=['TSS_agg', 'total_ov'])
# calculate per base tSSe
per_base_tsse = calc_per_base_tsse(tSSe_df, min_bias=min_bias, edge_size=edge_size_total)
# calculate global tSSe score
tsse_score = global_tsse_score(per_base_tsse, negativ_shift, edge_size=edge_size_per_base)
# add tSSe score to adata
tSSe_df['tsse_score'] = tsse_score
# remove temporary files
if not keep_tmp:
logger.info("cleaning up temporary files")
for tmp_file in tmp_files:
os.remove(tmp_file)
return tSSe_df
[docs]
@deco.log_anndata
@beartype
def add_tsse_score(adata: sc.AnnData,
fragments: str,
gtf: str,
negativ_shift: int = 2000,
positiv_shift: int = 2000,
edge_size_total: int = 100,
edge_size_per_base: int = 50,
min_bias: float = 0.01,
keep_tmp: bool = False,
temp_dir: str = "") -> sc.AnnData:
"""
Add the tSSe score to the adata object.
Calculating the TSSe score is done like described in: "Chromatin accessibility profiling by ATAC-seq" Fiorella et al. 2022
Parameters
----------
adata : sc.AnnData
AnnData object
fragments : str
path to fragments.bed
gtf : str
path to gtf file
negativ_shift : int, default 2000
number of bases to shift upstream for the flanking regions
positiv_shift : int, default 2000
number of bases to shift downstream for the flanking regions
edge_size_total : int, default 100
number of bases to use for the edges for the global tSSe score
edge_size_per_base : int, default 50
number of bases to use for the edges for the per base tSSe score
min_bias : float, default 0.01
minimum bias to avoid division by zero
keep_tmp : bool, default False
keep temporary files
temp_dir : str, default ""
path to temporary directory
Returns
-------
sc.AnnData
AnnData object with added tSSe score
"""
logger.info("adding tSSe score to adata object")
tSSe_df = tsse_scoring(fragments,
gtf,
negativ_shift=negativ_shift,
positiv_shift=positiv_shift,
edge_size_total=edge_size_total,
edge_size_per_base=edge_size_per_base,
min_bias=min_bias,
keep_tmp=keep_tmp,
temp_dir=temp_dir)
# add tSSe score to adata
adata.obs = adata.obs.join(tSSe_df['tsse_score'])
return adata