import datetime
import hashlib
import json
import logging
import pathlib
from itertools import combinations
from time import sleep
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import polars as pl
import rich_click as click
import seaborn
import statsmodels
import statsmodels.stats.multitest
import statsmodels.stats.weightstats
import stringdb
from scikit_posthocs import posthoc_dunn
from scipy.stats import kruskal, mannwhitneyu
from sklearn.preprocessing import MinMaxScaler, RobustScaler
import lbfextract.fextract
from lbfextract.plotting_lib.plotting_functions import plot_heatmap_diff_active_gi
from lbfextract.transcription_factor_analysis.loaders import ResultsLoader
from lbfextract.transcription_factor_analysis.utils import remove_outliers, generate_time_stamp
logger = logging.getLogger(__name__)
[docs]
def get_state(mean_1, mean_2):
state_m1 = np.where(mean_1 < 0, "+", "-").astype(str)
state_m2 = np.where(mean_2 < 0, "+", "-").astype(str)
return np.char.add(state_m1, state_m2)
[docs]
def signed_log2(x):
return np.sign(x) * np.log2(np.abs(x) + 1)
[docs]
class DiffSignalAnalysis:
def __init__(self,
df: pl.DataFrame | pd.DataFrame,
output_path: pathlib.Path,
signal_col_index: list[str],
outer_group_column: str,
inner_group_column: str,
value_column: str,
correction_method: str,
max_iter: int,
alpha: float,
overwrite: bool,
rm_outliers: bool,
save_individual_plots: bool,
use_provided_gi: bool = False,
user_background: list[str] = None,
normalize: bool = False
):
self.normalize = normalize
self.non_interpretable_fc = None
self.user_background = user_background
self.use_provided_gi = use_provided_gi
self.df = pl.from_pandas(df) if isinstance(df, pd.DataFrame) else df
self.outer_group_column = outer_group_column
self.inner_group_column = inner_group_column
self.value_column = value_column
self.correction_method = correction_method
self.max_iter = max_iter
self.alpha = alpha
self.signal_col_index = signal_col_index
self.results: pd.DataFrame | None = None
self.output_path = output_path
self.overwrite = overwrite
self.time_stamp = generate_time_stamp()
self.enrichment_results: dict[str, pd.DataFrame] | None = None
self.remove_outliers = rm_outliers
self.save_individual_plots = save_individual_plots
self.data_for_plot = None
if not self.output_path.exists():
logger.info(
f"output path: {self.output_path} does not exists. creating it with all missing parent folders.")
self.output_path.mkdir(exist_ok=True, parents=True)
self.run_id = self.__get_run_id()
if self.output_path.joinpath(self.get_output_folder()).exists():
logger.info(f"run id: {self.run_id} already exists.")
if self.overwrite:
logger.info(f"overwriting run id: {self.run_id}")
else:
logger.info(f"not overwriting run id: {self.run_id}, to overwrite set overwrite=True")
return
else:
self.output_path.joinpath(self.get_output_folder()).mkdir(exist_ok=True, parents=True)
[docs]
@staticmethod
def get_enrichment(genes: list[str], background: list[str] = None) -> pd.DataFrame | None:
try:
string_ids = stringdb.get_string_ids(genes).queryItem.to_list()
background_ids = stringdb.get_string_ids(
background).queryItem.to_list() if background is not None else None
sleep(1)
return stringdb.get_enrichment(string_ids,
background_string_identifiers=background_ids
)[["category", "preferredNames", "p_value", "fdr", "description"]]
except ValueError as e:
logger.warning("There was an error with the string database server. "
"Enrichment analysis could not be retrieved. the error was:\n"
f"{e}")
except KeyError:
logger.warning("Get enrichment failed due to a databse problem")
return None
[docs]
def get_output_folder(self):
return f"{self.time_stamp}__{self.run_id}"
[docs]
def run(self):
logger.info("Preprocessing the data ... ")
self.__preprocess_df()
logger.info("Extracting the differentially active TF ... ")
df = self.__get_diff_active_gi()
logger.info("Retrieving the enrichment analysis ... ")
self.__get_enrichment_result_per_group_pair(df)
return self
[docs]
def save_results(self):
output_path = self.output_path / self.get_output_folder()
output_path.mkdir(exist_ok=True, parents=True)
rejected_tfs = self.results.query("Rejected == True").genomic_interval.unique()
df_heatmap = (
self.__extract_column_for_plot(self.df.loc[self.df[self.inner_group_column].isin(rejected_tfs)])
.pivot_table(index="sample_name",
columns=self.inner_group_column,
values=self.value_column,
fill_value=0)
)
def get_outliers_mask(column, bound="lower"):
Q1 = column.quantile(0.25)
Q3 = column.quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
if bound == "lower":
return column < lower_bound
else:
return column > upper_bound
def replace_outliers(x_):
x = x_.copy()
mask_lower = get_outliers_mask(x, bound="lower")
mask_upper = get_outliers_mask(x, bound="upper")
x[mask_lower] = x[~mask_lower].min()
x[mask_upper] = x[~mask_upper].max()
return x
any_rejected = True if rejected_tfs.shape[0] > 0 else False
clusters = {"groups": self.df.groupby("sample_name")[self.outer_group_column].first()}
filter_rows = self.results[self.inner_group_column].isin(rejected_tfs)
adjusted_pvals = (
self.results.loc[filter_rows]
.pivot_table(index="group_pairs",
columns=self.inner_group_column,
values="adj_p-val",
fill_value=np.nan).T
)
mean_1 = self.results.mean_group_1.values
mean_2 = self.results.mean_group_2.values
fc = mean_1 / mean_2
log2_fc = np.log2(np.abs(fc))
self.non_interpretable_fc = fc < 0
states = get_state(mean_1, mean_2)
self.results.loc[:, "log2_fc"] = log2_fc
self.results.loc[:, "signed_log2"] = signed_log2(fc)
self.results.loc[:, "state"] = states
self.results.loc[:, "change"] = mean_1 - mean_2
log2_fc_vals = (
self.results
.assign(log2_fc=log2_fc)
.pivot_table(index="group_pairs",
columns=self.inner_group_column,
values="log2_fc",
fill_value=np.nan)
).T
if not df_heatmap.empty:
df_heatmap = pd.DataFrame(
data=MinMaxScaler().fit_transform(df_heatmap.apply(lambda x: replace_outliers(x), axis=0)),
columns=df_heatmap.columns,
index=df_heatmap.index
)
figsize_height = 10 + (df_heatmap.shape[1] // 20) + (clusters["groups"].unique().shape[0] * 2)
figsize_width = 10 + ((df_heatmap.shape[1] // 20) * 3)
fig, data_for_plot = plot_heatmap_diff_active_gi(
df_heatmap,
clusters=clusters,
split_key="groups",
adjusted_pval=adjusted_pvals,
log2_fc=log2_fc_vals.loc[rejected_tfs, :],
output_path=output_path,
filename="fextract_diff_signals_heatmap.png",
save=True,
figsize=(
figsize_width,
figsize_height,
),
cmap="RdYlBu_r"
)
self.data_for_plot = data_for_plot
fig = self.plot_barplot_rejected_per_group()
if fig:
fig.savefig(output_path / "fextract_diff_signals_per_group_bar_plot.png")
plt.close(fig)
self.__save_results_per_group_pair(any_rejected=any_rejected)
return self
def __preprocess_df(self):
df = self.df.to_pandas()
df = df[~df[self.outer_group_column].isin(["NA", "NaN", "None"])]
df = df[~df[self.outer_group_column].isna()]
self.df = df
def __get_diff_active_gi(self):
df_for_statistical_tests = self.__extract_column_for_plot(self.df)
if self.normalize:
df_for_statistical_tests = self.__normalize_between_samples(df_for_statistical_tests)
genomic_intervals_groups = df_for_statistical_tests.groupby(self.inner_group_column)
group_names = self.df[self.outer_group_column].unique()
groups_names_combinations = sorted(combinations(group_names, 2))
n_groups = len(group_names)
if n_groups == 2:
df = self.__get_diff_active_gi_two_groups(genomic_intervals_groups, groups_names_combinations)
self.results = self.__get_global_correction_two_groups(df)
elif n_groups > 2:
df = self.__get_diff_active_gi_multiple_groups(genomic_intervals_groups, groups_names_combinations)
self.results = self.__get_global_correction_multiple_groups(df)
else:
raise ValueError(f"Number of groups should be 2 or more found {len(group_names)}")
return df
def __get_enrichment_result_per_group_pair(self, df: pd.DataFrame):
diff_active_region_per_group_pair = self.results.query("Rejected == True").groupby("group_pairs")
background = self.__get_background(df)
self.enrichment_results = {
f"{group_pair}": self.get_enrichment(
diff_active_region_per_group_pair.get_group(group_pair)[self.inner_group_column]
.to_list(),
background=background
)
for group_pair in diff_active_region_per_group_pair.groups
}
def __get_run_id(self):
dict_for_hash = {
"df": self.df,
"outer_group_column": self.outer_group_column,
"inner_group_column": self.inner_group_column,
"value_column": self.value_column,
"correction_method": self.correction_method,
"max_iter": self.max_iter,
"alpha": self.alpha,
"signal_col_index": self.signal_col_index,
"output_path": self.output_path,
}
h = hashlib.sha1()
h.update(json.dumps(dict_for_hash, sort_keys=True, default=str).encode())
return h.hexdigest()
def __extract_column_for_statistical_test(self, df):
return df[[self.outer_group_column, self.inner_group_column, self.value_column]]
def __extract_column_for_plot(self, df):
return df[[self.outer_group_column, self.inner_group_column, self.value_column, "sample_name"]]
def __normalize_between_samples(self, df, normalization_type=RobustScaler, normalization_type_kwargs=None):
df = df.copy()
normalization_type_kwargs = {} if normalization_type_kwargs is None else normalization_type_kwargs
df_group_col = df[[self.outer_group_column, "sample_name"]].copy()
df = df.pivot_table(index="sample_name",
columns=self.inner_group_column,
values=self.value_column)
columns = df.columns.copy()
index = df.index.copy()
df = normalization_type(**normalization_type_kwargs).fit_transform(df)
df = pd.DataFrame(df, columns=columns, index=index)
df = df.reset_index().melt(id_vars=["sample_name"], var_name=self.inner_group_column,
value_name=self.value_column)
return pd.merge(df, df_group_col, on='sample_name', how='inner')
@staticmethod
def __attach_group_column(c, group_name):
return pd.DataFrame(
{
"group": c.apply(lambda x: group_name),
"value": c
}
)
def __get_diff_active_gi_two_groups(self, genomic_intervals_groups, groups_names_combinations):
group_1_name, group_2_name = groups_names_combinations[0]
matrix_res = []
for gi in genomic_intervals_groups.groups:
gi_groups_grouped_by_group = genomic_intervals_groups.get_group(gi).groupby(self.outer_group_column)
long_df = pd.concat([
self.__attach_group_column(remove_outliers(group[self.value_column]), name)
if self.remove_outliers
else self.__attach_group_column(group[self.value_column], name)
for name, group
in gi_groups_grouped_by_group
])
groups_values = {name: group.value.values for name, group in long_df.groupby("group")}
_, p_value = mannwhitneyu(groups_values[group_1_name], groups_values[group_2_name])
group_df_means = long_df.groupby("group").mean()
group_df_stds = long_df.groupby("group").std()
for comb in groups_names_combinations:
matrix_res.append(
[
"-".join(comb),
group_df_means.loc[comb[0], "value"],
group_df_means.loc[comb[1], "value"],
group_df_stds.loc[comb[0], "value"],
group_df_stds.loc[comb[1], "value"],
gi,
p_value
]
)
columns_names = [
"group_pairs",
"mean_group_1",
"mean_group_2",
"std_group_1",
"std_group_2",
self.inner_group_column,
"p_value"
]
df = pd.DataFrame(matrix_res, columns=columns_names)
df[self.inner_group_column] = df[self.inner_group_column].str.replace("/", "_")
return df
def __get_diff_active_gi_multiple_groups(self, genomic_intervals_groups, groups_names_combinations):
matrix_res = []
for gi in genomic_intervals_groups.groups:
gi_groups_grouped_by_group = genomic_intervals_groups.get_group(gi).groupby(self.outer_group_column)
long_df = pd.concat([
self.__attach_group_column(remove_outliers(group[self.value_column]), name)
if self.remove_outliers
else self.__attach_group_column(group[self.value_column], name)
for name, group
in gi_groups_grouped_by_group])
kruskal_p_value = kruskal(*[group.value for name, group in long_df.groupby("group")]).pvalue
perform_dunn_test = True if kruskal_p_value <= self.alpha else False
if perform_dunn_test:
dunn_test_res = posthoc_dunn(long_df, val_col="value", group_col="group")
group_df_means = long_df.groupby("group").mean()
group_df_stds = long_df.groupby("group").std()
for comb in groups_names_combinations:
if perform_dunn_test:
p_value = dunn_test_res.loc[comb[0], comb[1]]
else:
p_value = np.nan
matrix_res.append(
[
"-".join(comb),
group_df_means.loc[comb[0], "value"],
group_df_means.loc[comb[1], "value"],
group_df_stds.loc[comb[0], "value"],
group_df_stds.loc[comb[1], "value"],
gi,
kruskal_p_value,
p_value
]
)
columns_names = [
"group_pairs",
"mean_group_1",
"mean_group_2",
"std_group_1",
"std_group_2",
self.inner_group_column,
"kruskal_p_value",
"p_value"
]
df = pd.DataFrame(matrix_res, columns=columns_names)
df[self.inner_group_column] = df[self.inner_group_column].str.replace("/", "_")
return df
def __get_background(self, df):
if self.use_provided_gi:
background = df.genomic_interval.unique()
elif self.user_background is not None:
background = self.user_background
else:
background = None
return background
def __get_global_correction_multiple_groups(self, df_):
df_ = df_.copy()
unique_kruskal = df_.loc[:, [self.inner_group_column, "kruskal_p_value"]].drop_duplicates()
unique_kruskal.loc[:, "kruskal_p_value"] = statsmodels.stats.multitest.multipletests(
unique_kruskal["kruskal_p_value"],
alpha=self.alpha,
method=self.correction_method,
maxiter=self.max_iter,
is_sorted=False,
returnsorted=False
)[1]
unique_kruskal = unique_kruskal.set_index(self.inner_group_column)
df_.loc[:, "kruskal_p_value"] = df_[self.inner_group_column].apply(
lambda x: unique_kruskal.loc[x, "kruskal_p_value"]
)
passed_kruscal = df_.loc[:, "kruskal_p_value"] <= self.alpha
if passed_kruscal.sum() == 0:
df_.loc[:, "Rejected"] = False
df_.loc[:, "adj_p-val"] = np.nan
df_.loc[:, "p_value"] = np.nan
return df_
df_.loc[passed_kruscal, "adj_p-val"] = statsmodels.stats.multitest.multipletests(
df_.loc[passed_kruscal, "p_value"],
alpha=self.alpha,
method=self.correction_method,
maxiter=self.max_iter,
is_sorted=False,
returnsorted=False
)[1]
df_.loc[df_.loc[:, "adj_p-val"].isna(), "p_value"] = np.nan
df_.loc[:, "Rejected"] = np.logical_and(
df_.loc[:, "adj_p-val"] <= self.alpha,
passed_kruscal
)
return df_
def __get_global_correction_two_groups(self, df_):
df_ = df_.copy()
df_.loc[:, "adj_p-val"] = statsmodels.stats.multitest.multipletests(
df_.loc[:, "p_value"],
alpha=self.alpha,
method=self.correction_method,
maxiter=self.max_iter,
is_sorted=False,
returnsorted=False
)[1]
df_.loc[:, "Rejected"] = df_.loc[:, "adj_p-val"] <= self.alpha
return df_
[docs]
def plot_barplot_rejected_per_group(self):
if self.results is None:
logger.warning("Analysis has not been run yet. Run it using the run() method before plotting.")
return None
fig, ax = plt.subplots(1, figsize=(10, 10), dpi=300)
df_to_plot = self.results.query("Rejected == True").groupby("group_pairs").count()[["Rejected"]]
if not df_to_plot.empty:
df_to_plot.plot(
kind="bar", ax=ax)
counts = self.results.query("Rejected == True").groupby("group_pairs").count()["Rejected"]
max_counts = counts.max()
for i, count in enumerate(counts):
ax.text(i, count + (max_counts * 0.03), str(count), ha='center',
bbox=dict(boxstyle="round", fc="w", ec="k"))
ax.spines[['right', 'top', 'bottom']].set_visible(False)
return fig
else:
logger.warning("No group pairs were rejected")
return None
[docs]
def plot_coverage(self, gi):
if self.results is None:
logger.warning("Analysis has not been run yet. Run it using the run() method before plotting.")
return None
signal_shape = len(self.signal_col_index)
if self.results is None:
logger.warning("Analysis has not been run yet. Running analysis first")
self.run()
df = self.df.loc[
self.df[self.inner_group_column] == gi,
[self.outer_group_column, self.inner_group_column] + self.signal_col_index
]
df = df[~df[self.outer_group_column].isin(["NA", "NaN", "None"])].set_index(self.outer_group_column).drop(
"genomic_interval", axis=1)
colors_std, colors_means = {}, {}
outer_groups = self.df[self.outer_group_column].unique()
palette = list(seaborn.color_palette("Paired", len(outer_groups) * 2))
for group in sorted(outer_groups):
colors_std[group] = palette.pop()
colors_means[group] = palette.pop()
means = df.groupby(self.outer_group_column).mean()
stds = df.groupby(self.outer_group_column).std()
fig, ax = plt.subplots(figsize=(20, 10))
fig.suptitle(gi, fontsize=20)
for i in stds.index.to_list():
ax.plot(means.loc[i], color=colors_means[i], label=i)
ax.plot(means.loc[i] + stds.loc[i], linestyle='dashed', color="lightgray")
ax.plot(means.loc[i] - stds.loc[i], linestyle='dashed', color="lightgray")
ax.fill_between(np.arange(signal_shape), means.loc[i] - stds.loc[i], means.loc[i] + stds.loc[i],
color=colors_std[i],
alpha=0.3)
ax.spines[['right', 'top', 'bottom', "left"]].set_visible(False)
ax.set_xticks(list(range(0, signal_shape + 1, signal_shape // 4)),
list(range(0, signal_shape + 1, signal_shape // 4)))
ax.axvline(signal_shape // 2, ls="-.", color="black",
label="GI center")
fig.legend()
ax.grid(linestyle='dashed')
ax.set_ylabel("normalized coverage")
ax.set_xlabel("position around GI center")
return fig
def __save_results_per_group_pair(self, any_rejected: bool):
if self.results is None:
logger.warning("Analysis has not been run yet. Running analysis first")
self.run()
self.results.loc[self.non_interpretable_fc, "log2_fc"] = "NI"
run_folder = self.get_output_folder()
res_by_group_pair = self.results.groupby("group_pairs")
for group_pair in res_by_group_pair.groups:
output_path_group_pair = self.output_path / run_folder / group_pair
output_path_group_pair.mkdir(exist_ok=True, parents=True)
group = res_by_group_pair.get_group(group_pair)
(
group
.sort_values("adj_p-val", ascending=True)
.to_csv(output_path_group_pair / f"fextract_diff_signal_analysis_{group_pair}.csv")
)
if self.enrichment_results.get(group_pair, None) is not None:
self.enrichment_results.get(group_pair).to_csv(output_path_group_pair / f"enrichment_{group_pair}.csv")
if any_rejected and self.save_individual_plots:
rejected_gi = group.query('Rejected == True')
for genomic_interval in rejected_gi[self.inner_group_column]:
fig = self.plot_coverage(gi=genomic_interval)
fig.savefig(output_path_group_pair / f"coverage_{genomic_interval}.png")
plt.close(fig)
return self
[docs]
def parse_indices(ctx, param, value: str) -> tuple[int, int] | None:
return tuple([int(i) for i in value.split(",")])[0:2] if value else None
[docs]
def create_path(ctx, param, value):
if not pathlib.Path(value).exists():
pathlib.Path(value).mkdir(parents=True, exist_ok=True)
return value
@click.group(name="post_extraction_analysis_commands")
@click.option("--path_to_res_summary",
required=True,
type=click.Path(exists=True,
path_type=pathlib.Path,
dir_okay=True,
file_okay=False,
readable=True,
resolve_path=False,
allow_dash=True),
help="Path to the directory containing the fextract in batch results."
)
@click.option("--signal_length", type=int, default=4000, show_default=True)
@click.option("--center_signal_indices", type=str, default="1800,2200", callback=parse_indices, show_default=True)
@click.option("--flanking_signal_indices", type=str, default="1000,3000", callback=parse_indices, show_default=True)
@click.option("--normalize", is_flag=True, default=False, show_default=True)
@click.option("--output_path",
type=click.Path(
exists=False, path_type=pathlib.Path, dir_okay=True, file_okay=False,
readable=True, resolve_path=False, allow_dash=True, writable=True
),
default=pathlib.Path.cwd(), show_default=True, callback=create_path)
@click.option("--overwrite", is_flag=True, default=False, show_default=True)
@click.option("--path_to_sample_sheet", type=click.Path(exists=True, path_type=pathlib.Path, dir_okay=False,
file_okay=True, readable=True, resolve_path=False,
allow_dash=True), default=None, show_default=True)
@click.option("--outer_group_column", type=str, default="group", show_default=True)
@click.option("--inner_group_column", type=str, default="genomic_interval", show_default=True)
@click.option("--value_column", type=str, default="amplitude", show_default=True)
@click.option("--correction_method", type=str, default="hs", show_default=True)
@click.option("--max_iter", type=int, default=1, show_default=True)
@click.option("--alpha", type=float, default=0.05, show_default=True)
@click.option("--rm_outliers", is_flag=True, default=False, show_default=True)
@click.option("--commit_hash", type=str, default="Not Provided", show_default=True)
@click.option("--save_individual_plots", is_flag=True, default=False, show_default=True)
@click.option("--use_provided_gi", is_flag=True, default=False, show_default=True,
help="Use the provided genomic intervals as background for the enrichment analysis")
@click.option("--user_background",
type=click.Path(exists=True, path_type=pathlib.Path, dir_okay=False,
file_okay=True, readable=True, resolve_path=False,
allow_dash=True),
default=None,
show_default=True,
help="Path to a file containing a list of genomic intervals to be used as background for the enrichment analysis")
@click.pass_context
def cli(ctx,
path_to_res_summary: pathlib.Path,
signal_length: int,
center_signal_indices: tuple,
flanking_signal_indices: tuple,
normalize: bool,
output_path: pathlib.Path,
outer_group_column: str,
inner_group_column: str,
value_column: str,
correction_method: str,
max_iter: int,
alpha: float,
overwrite: bool,
path_to_sample_sheet: pathlib.Path,
rm_outliers: bool,
commit_hash: str,
save_individual_plots: bool,
user_background: pathlib.Path,
use_provided_gi: bool
):
"""
This command group contains all commands related to post-extraction analysis of feature extraction results.
\b
For a list of available commands and their short descriptions, use:
\b
lbfextract post_extraction_analysis_commands --help
\n
\b
For detailed help on a specific command, including available arguments and their default values, use:
\b
lbfextract post_extraction_analysis_commands [command] --help
\n
"""
ctx.obj = dict()
ctx.obj["path_to_res_summary"] = path_to_res_summary
ctx.obj["signal_length"] = signal_length
ctx.obj["center_signal_indices"] = center_signal_indices
ctx.obj["flanking_signal_indices"] = flanking_signal_indices
ctx.obj["normalize"] = normalize
ctx.obj["output_path"] = output_path
ctx.obj["outer_group_column"] = outer_group_column
ctx.obj["inner_group_column"] = inner_group_column
ctx.obj["value_column"] = value_column
ctx.obj["correction_method"] = correction_method
ctx.obj["max_iter"] = max_iter
ctx.obj["alpha"] = alpha
ctx.obj["overwrite"] = overwrite
ctx.obj["path_to_sample_sheet"] = path_to_sample_sheet
ctx.obj["rm_outliers"] = rm_outliers
ctx.obj["commit_hash"] = commit_hash
ctx.obj["save_individual_plots"] = save_individual_plots
ctx.obj["use_provided_gi"] = use_provided_gi
ctx.obj["user_background"] = user_background
@cli.command(short_help="It extracts differentially active TFs between 2 or multiple groups")
@click.pass_context
def get_differentially_active_genomic_intervals(ctx):
"""
This command extracts the differentially active TFs between 2 or more groups and returns a table of differentially
active TFs together with their adjusted-p value calculated according to the selected method
(default: Benjamini-Hochberg procedure). In the former case the Mann-Whitney-Wilcoxon test is used, in the latter
the Kruskal-Wallis test is used.
"""
logger.info("Starting the analysis ... ")
starting_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
output_path = ctx.obj["output_path"] / "fextract_diff_signal_results"
output_path.mkdir(exist_ok=True)
accessibility_extraction_config = dict(
start=ctx.obj["center_signal_indices"][0],
end=ctx.obj["center_signal_indices"][1]
)
if ctx.obj["user_background"]:
user_background = pd.read_csv(ctx.obj["user_background"], header=None)[0].to_list()
else:
user_background = None
logger.info(f"Loading all signals present in: {ctx.obj['path_to_res_summary']} ... ")
df_results = ResultsLoader(ctx.obj["path_to_res_summary"],
accessibility_extraction_config,
ctx.obj["signal_length"],
ctx.obj["flanking_signal_indices"],
ctx.obj["normalize"],
path_to_sample_sheet=ctx.obj["path_to_sample_sheet"],
grouping_column=ctx.obj["outer_group_column"]).load()
diff_signal_analysis = DiffSignalAnalysis(df=df_results,
outer_group_column=ctx.obj["outer_group_column"],
inner_group_column=ctx.obj["inner_group_column"],
value_column=ctx.obj["value_column"],
correction_method=ctx.obj["correction_method"],
max_iter=ctx.obj["max_iter"],
alpha=ctx.obj["alpha"],
signal_col_index=[str(i) for i in range(ctx.obj["signal_length"])],
output_path=output_path,
overwrite=ctx.obj["overwrite"],
rm_outliers=ctx.obj["rm_outliers"],
save_individual_plots=ctx.obj["save_individual_plots"],
user_background=user_background,
use_provided_gi=ctx.obj["use_provided_gi"])
logger.info(f"Starting the differential activity calculation ... ")
diff_signal_analysis.run()
logger.info(f"Saving the results ... ")
diff_signal_analysis.save_results()
logger.info(f"Results were saved.They are located at {output_path}")
metadata = dict(
paccakge_repo="LBFextract",
lbf_version=f"{lbfextract.__version__}",
commithash=ctx.obj["commit_hash"],
starting_time=starting_time,
finisheing_time=datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
run_id=diff_signal_analysis.run_id,
path_to_res_summary=ctx.obj["path_to_res_summary"],
signal_length=ctx.obj["signal_length"],
center_signal_indices=ctx.obj["center_signal_indices"],
flanking_signal_indices=ctx.obj["flanking_signal_indices"],
normalize=ctx.obj["normalize"],
output_path=output_path,
outer_group_column=ctx.obj["outer_group_column"],
inner_group_column=ctx.obj["inner_group_column"],
value_column=ctx.obj["value_column"],
correction_method=ctx.obj["correction_method"],
max_iter=ctx.obj["max_iter"],
alpha=ctx.obj["alpha"],
overwrite=ctx.obj["overwrite"],
remove_outliers=ctx.obj["rm_outliers"],
save_individual_plots=ctx.obj["save_individual_plots"]
)
logger.info("Metadata:")
logger.info(metadata)
results = dict(
results=diff_signal_analysis.results,
enrichment_results=diff_signal_analysis.enrichment_results,
data_for_plot=diff_signal_analysis.data_for_plot
)
import dill
path_to_metadata = output_path / diff_signal_analysis.get_output_folder() / "metadata.json"
logger.info(f"Saving metadata to: {path_to_metadata} ")
path_to_result_object = output_path / diff_signal_analysis.get_output_folder() / "results.pkl"
logger.info(f"Saving result object to: {path_to_result_object} ")
with open(path_to_metadata, "w") as f:
json.dump(metadata, f, indent=4, sort_keys=True, default=str)
with open(path_to_result_object, "wb") as f:
dill.dump(results, f)
logger.info("finished")
@cli.command(short_help="It generates the samples sheet used by the get_differentially_active_genomic_intervals "
"command.")
@click.pass_context
def generate_sample_sheet(ctx):
"""
This command generate the sample sheet, which needs to be filled with the group membership before running the
get_differentially_active_genomic_intervals command.
"""
output_path = ctx.obj["output_path"] / "fextract_diff_signal_results"
output_path.mkdir(exist_ok=True)
df_results = ResultsLoader(ctx.obj["path_to_res_summary"], ctx.obj["signal_length"],
ctx.obj["center_signal_indices"], ctx.obj["flanking_signal_indices"],
ctx.obj["normalize"], path_to_sample_sheet=ctx.obj["path_to_sample_sheet"])
sample_sheet = df_results.generate_sample_sheet()
sample_sheet.to_csv(output_path / "sample_sheet.csv", index=True)
if __name__ == "__main__":
cli(obj={})