Source code for lbfextract.fextract_batch_coverage.plugin

import logging
import operator
import pathlib
from functools import reduce
from typing import Any, Union, List, Optional

import click
import dill as pickle
import numpy as np
import pandas as pd
import pyranges
import pysam
from matplotlib import pyplot as plt

import lbfextract.fextract
import lbfextract.fextract.signal_transformer
from lbfextract.core import App
from lbfextract.fextract.cli_lib import calculate_reference_distribution, get_peaks
from lbfextract.fextract.schemas import AppExtraConfig, Config, SingleSignalTransformerConfig, ReadFetcherConfig, \
    SignalSummarizer
from lbfextract.fextract_batch_coverage.schemas import PlotConfig
from lbfextract.plotting_lib.plotting_functions import plot_heatmap_kde_amplitude, plot_signal_batch, \
    plot_signal
from lbfextract.utils import load_temporary_bed_file, filter_bam, get_tmp_fextract_file_name, generate_time_stamp, \
    check_input_bed, check_input_bam, filter_out_empty_bed_files, sanitize_file_name
from lbfextract.utils_classes import Signal

logger = logging.getLogger(__name__)


[docs] def extract_coverage(coverage_extractor, transformed_reads): array = np.vstack(transformed_reads.apply(lambda x: coverage_extractor(x), axis=1)) return array
[docs] def identity(x, axis=None): return x
[docs] class IntervalIterator: def __init__(self, df: pd.DataFrame, path_to_bam: pathlib.Path, multiple_iterators: bool, extra_bases: int, flanking_window: int, flip_based_on_strand: bool | None = None): self.df = df self.df_by_name = self.df.groupby("Name") self.sequence = list(self.df_by_name.groups.keys()) self.path_to_bam = path_to_bam self._index = 0 self.multiple_iterators = multiple_iterators self.extra_bases = extra_bases self.signal_transformer = identity self.signal_summarizer = identity self.flanking_window = flanking_window self.flip_based_on_strand = flip_based_on_strand self.bamfile = pysam.AlignmentFile(self.path_to_bam) def __iter__(self): return self
[docs] def set_signal_transformer(self, signal_transformer): self.signal_transformer = signal_transformer
[docs] def set_signal_summarizer(self, signal_summarizer): self.signal_summarizer = signal_summarizer
def __getstate__(self): # Return the state that can be pickled state = self.__dict__.copy() state["bamfile"] = None return state def __setstate__(self, state): self.__dict__.update(state) self.path_to_bam = pysam.AlignmentFile(state['path_to_bam']) def __next__(self): if self._index < len(self.sequence): key = self.sequence[self._index] df = self.df_by_name.get_group(key).copy() df["reads_per_interval"] = [ self.bamfile.fetch(row.Chromosome, row.Start, row.End, multiple_iterators=False) for row in df.itertuples() ] df["Start"] += self.extra_bases df["End"] -= self.extra_bases array = np.vstack([ self.signal_transformer(row) for row in df.itertuples() ]) if self.flip_based_on_strand: strands = df["Strand"].values flip_mask = strands == "-" array[flip_mask] = np.fliplr(array[flip_mask]) self._index += 1 indices = np.arange(array.shape[1]) index_flanking = np.logical_or( indices < self.flanking_window, indices > (array.shape[1] - self.flanking_window) ) means_flanking = array[:, index_flanking].mean(axis=1) mask = means_flanking != 0 normalized_array = np.zeros_like(array, dtype=float) normalized_array[mask] = array[mask] / means_flanking[mask, None] return {key: self.signal_summarizer(normalized_array, axis=0)} else: raise StopIteration
[docs] class FextractHooks:
[docs] @lbfextract.hookimpl def fetch_reads(self, path_to_bam: pathlib.Path, path_to_bed: pathlib.Path, config: Any, extra_config: Any) -> IntervalIterator: """ :param path_to_bam: path to the bam file :param path_to_bed: path to the bed file with the regions to be filtered :param config: configuration file containing the configuration object required by the fetch_reads function :param extra_config: extra configuration that may be used in the hook implementation :return: ReadsPerIntervalContainer object containing all the ReadsPerInterval objects in all the intervals contained in the bed file """ config_f = config.f or 2 config_F = config.F or 3868 check_input_bed(path_to_bed) check_input_bam(path_to_bam) bed_files_paths = filter_out_empty_bed_files(path_to_bed) temporary_bed_files_name = [ load_temporary_bed_file(bed_file=bed, extra_bases=config.extra_bases, window=config.window, flanking_region_window=config.flanking_region_window, n_binding_sites=config.n_binding_sites, run_id=extra_config.ctx["run_id"])[1].as_df() for bed in bed_files_paths ] concat_bed = pd.concat(temporary_bed_files_name) path_to_tmp_file = get_tmp_fextract_file_name(extra_config.ctx["run_id"]) pyranges.PyRanges(concat_bed).to_csv(path_to_tmp_file, sep="\t", header=None) tmp_bam_file = filter_bam(path_to_bam, path_to_tmp_file, cores=extra_config.cores, run_id=extra_config.ctx["run_id"], f=config_f, F=config_F) return IntervalIterator(concat_bed, tmp_bam_file, multiple_iterators=True, extra_bases=config.extra_bases, flanking_window=config.flanking_region_window)
[docs] @lbfextract.hookimpl def save_fetched_reads(self, reads_per_interval_container: IntervalIterator, config: Any, extra_config: Any ) -> None: """ Hook implementing the strategy to save the reads fetched from the intervals :param reads_per_interval_container: ReadsPerIntervalContainer containing information about the genomic region and the reads mapping to it :param config: object contaiing the configuration for saving the reads :param extra_config: extra configuration that may be used in the hook implementation :return: None """ output_path = extra_config.ctx["output_path"] run_id = extra_config.ctx["id"] signal_type = extra_config.ctx["single_signal_transformer_config"].signal_transformer with open(output_path / f"{run_id}__{signal_type}__reads_iterator.pkl", "wb") as f: pickle.dump(reads_per_interval_container, f) return output_path / f"{run_id}__{signal_type}__reads_iterator.pkl"
[docs] @lbfextract.hookimpl def load_fetched_reads(self, config: Any, extra_config: AppExtraConfig) -> pd.DataFrame: """ :param config: config specific to the function :param extra_config: extra configuration that may be used in the hook implementation """ output_path = extra_config.ctx["output_path"] run_id = extra_config.ctx["id"] signal_type = extra_config.ctx["single_signal_transformer_config"].signal_transformer with open(output_path / f"{run_id}__{signal_type}__reads_iterator.pkl", "rb") as f: return pickle.load(f)
[docs] @lbfextract.hookimpl def transform_single_intervals(self, transformed_reads: IntervalIterator, config: Any, extra_config: Any) -> IntervalIterator: """ :param transformed_reads: ReadsPerIntervalContainer containing a list of ReadsPerInterval which are basically lists with information about start and end of the interval :param config: config specific to the function :param extra_config: config containing context information plus extra parameters """ transformed_reads.flip_based_on_strand = config.flip_based_on_strand signal_transformers_dict = { "coverage": {"class": "TFBSCoverage", "config": {"gc_correction": config.gc_correction, "tag": config.tag} }, "coverage_dyads": {"class": "TFBSCoverageAroundDyads", "config": {"n": config.n, "gc_correction": config.gc_correction, "tag": config.tag, "peaks": config.peaks} }, "middle_point_coverage": {"class": "TFBSMiddlePointCoverage", "config": {"gc_correction": config.gc_correction, "tag": config.tag} }, "middle_n_points_coverage": {"class": "TFBSNmiddlePointCoverage", "config": {"n": config.n, "gc_correction": config.gc_correction, "tag": config.tag} }, "sliding_window_coverage": {"class": "TFBSSlidingWindowCoverage", "config": {"window_size": config.window_size, "gc_correction": config.gc_correction, "tag": config.tag} }, "peter_ulz_coverage": {"class": "PeterUlzCoverage", "config": {"read_start": config.read_start, "read_end": config.read_end, "gc_correction": config.gc_correction, "tag": config.tag} }, "wps_coverage": {"class": "WPSCoverage", "config": {"window_size": config.window_size, "gc_correction": config.gc_correction, "min_fragment_length": config.min_fragment_length, "max_fragment_length": config.max_fragment_length, "tag": config.tag, } }, } coverage_extractor_params = signal_transformers_dict[config.signal_transformer]["config"] coverage_extractor = getattr(lbfextract.fextract.signal_transformer, signal_transformers_dict[config.signal_transformer]["class"])( **coverage_extractor_params) transformed_reads.set_signal_transformer(coverage_extractor) return transformed_reads
[docs] @lbfextract.hookimpl def transform_all_intervals(self, single_intervals_transformed_reads: IntervalIterator, config: Any, extra_config: Any) -> Signal: """ :param single_intervals_transformed_reads: Signal object containing the signals per interval :param config: config specific to the function :param extra_config: extra configuration that may be used in the hook implementation """ summary_method = { "mean": np.nanmean, "median": np.nanmedian, "max": np.nanmax, "min": np.nanmin, } single_intervals_transformed_reads.set_signal_summarizer(summary_method[config.summarization_method]) summarized_signal_per_bed = [i for i in single_intervals_transformed_reads] summarized_signal_per_bed = pd.DataFrame(reduce(operator.ior, summarized_signal_per_bed, {})).T return Signal( array=summarized_signal_per_bed.values, metadata=summarized_signal_per_bed.index, tags=tuple([extra_config.ctx["single_signal_transformer_config"].signal_transformer, config.summarization_method, ]) )
[docs] @lbfextract.hookimpl def plot_signal(self, signal: Signal, config: PlotConfig, extra_config: Any) -> dict[str, pathlib.Path]: """ :param signal: Signal object containing the signals per interval :param config: object containing the parameters of the plot :param extra_config: extra configuration that may be used in the hook implementation """ big_fs = 20 medium_fs = 15 top = config.top if config.top else 5 bottom = config.bottom if config.bottom else 5 fig_pths = {} df = pd.DataFrame(signal.array, index=signal.metadata) flanking = int((df.shape[1] // 5) * 2) if not config.flanking else config.flanking fig_plot_heatmap_kde_amplitude, ax = plot_heatmap_kde_amplitude( array=df, title=" ".join(signal.tags).capitalize(), title_font_size=config.title_font_size if config.title_font_size else 25, general_font_size=config.general_font_size if config.general_font_size else 20, tf_to_annotate=config.tf_to_annotate if config.tf_to_annotate else None, ylabel=config.ylabel if config.ylabel else None, flanking=flanking, annotation_center_line=config.annotation_center_line if config.annotation_center_line else "interval center", window_center=config.window_center if config.window_center else 50, top=top, bottom=bottom, ) signal_type = "_".join(signal.tags) time_stamp = generate_time_stamp() run_id = extra_config.ctx["id"] file_name = f"{time_stamp}__{run_id}__{signal_type}__heatmap_kde_amplitude_plot.png" file_name_sanitized = sanitize_file_name(file_name) output_path = extra_config.ctx["output_path"] / file_name_sanitized fig_plot_heatmap_kde_amplitude.savefig(output_path, dpi=600) fig_pths["plot_heatmap_kde_amplitude"] = output_path plt.close(fig_plot_heatmap_kde_amplitude) fig_plot_signal_batch, ax = plot_signal_batch( df, apply_savgol=config.apply_savgol if config.apply_savgol else False, savgol_window_length=config.savgol_window_length if config.apply_savgol else 11, savgol_polyorder=config.savgol_polyorder if config.apply_savgol else 3, signal=signal_type, title=f"{signal_type.capitalize()} top and bottom BED files\n(ranked by accessibility)", color=config.color if config.color else "blue", label=f"{signal_type} signal summary", flanking=flanking, xlabel=config.xlabel if config.xlabel else None, window_center=config.window_center if config.window_center else 50, top=top, bottom=bottom, figsize=(20, 10), ) file_name = f"{time_stamp}__{run_id}__{signal_type}__batch_signals.png" file_name_sanitized = sanitize_file_name(file_name) output_path = extra_config.ctx["output_path"] / file_name_sanitized fig_plot_signal_batch.savefig(output_path, dpi=600) fig_pths["plot_signal_batch"] = output_path plt.close(fig_plot_signal_batch) for i in range(df.shape[0]): file_name = f"{time_stamp}__{run_id}__{signal_type}__{df.index[i]}__signal.png" file_name_sanitized = sanitize_file_name(file_name) title = ( f"SIGNAL TYPE: {signal_type}\n" f"ID: {extra_config.ctx['path_to_bam'].stem}\n" f"BED file: {df.index[i]}" ) fig_plot_signal, ax = plot_signal(df.iloc[i, :].values, label="center", title=title, title_font_size=big_fs, general_font_size=medium_fs) ax.set_ylabel(signal_type, fontsize=medium_fs) ax.set_xlabel("Position", fontsize=medium_fs) ax.set_xticklabels(ax.get_xticklabels(), fontsize=medium_fs) ax.set_yticklabels(ax.get_yticklabels(), fontsize=medium_fs) output_path = extra_config.ctx["output_path"] / file_name_sanitized fig_plot_signal.savefig(output_path, dpi=600) fig_pths[i] = output_path plt.close(fig_plot_signal) return fig_pths
[docs] @lbfextract.hookimpl def save_signal(self, signal: Signal, config: Any, extra_config: Any) -> None: """ :param signal: Signal object containing the signals per interval :param extra_config: extra configuration that may be used in the hook implementation """ output_path = extra_config.ctx["output_path"] time_stamp = generate_time_stamp() run_id = extra_config.ctx["id"] signal_type = "_".join(signal.tags) file_name = f"{time_stamp}__{run_id}__{signal_type}__signal.csv" file_name_sanitized = sanitize_file_name(file_name) file_path = output_path / file_name_sanitized logging.info(f"Saving signal to {file_path}") df = pd.DataFrame(signal.array, index=signal.metadata) pd.DataFrame(df).to_csv(file_path)
[docs] class CliHook:
[docs] @lbfextract.hookimpl_cli def get_command(self) -> Union[click.Command, List[click.Command]]: @click.command( short_help="It extracts the fragment coverage signal from a BAM file for each BED file provided.") @click.option('--path_to_bam', type=click.Path(exists=False, file_okay=True, dir_okay=True, writable=False, readable=True, resolve_path=False, allow_dash=True, path_type=pathlib.Path, executable=False), help='path to the bam file to be used') @click.option('--path_to_bed', type=click.Path(exists=False, file_okay=False, dir_okay=True, writable=False, readable=True, resolve_path=False, allow_dash=True, path_type=pathlib.Path, executable=False), help='path to the directory containing the bed files to be used') @click.option('--output_path', type=click.Path(exists=False, file_okay=False, dir_okay=True, writable=True, readable=True, resolve_path=False, allow_dash=True, path_type=pathlib.Path, executable=False), help='path to the output directory') @click.option("--skip_read_fetching", is_flag=True, show_default=True, help='Boolean flag. When it is set, the fetching of the reads is skipped and the latest' 'timestamp of this run (identified by the id) is retrieved') @click.option("--exp_id", default=None, type=str, show_default=True, help="run id") @click.option("--window", default=1000, type=int, show_default=True, help="Integer describing the number of bases to be extracted around the middle point of an" " interval present in the bedfile") @click.option("--flanking_window", default=1000, type=int, show_default=True, help="Integer describing the number of bases to be extracted after the window") @click.option("--extra_bases", default=2000, type=int, show_default=True, help="Integer describing the number of bases to be extracted from the bamfile when removing the " "unused bases to be sure to get all the proper paires, which may be mapping up to 2000 bs") @click.option("--n_binding_sites", default=1000, type=int, show_default=True, help="number of intervals to be used to extract the signal, if it is higher then the provided" "intervals, all the intervals will be used") @click.option("--summarization_method", default="mean", type=click.Choice(["mean", "median", "max", "min"]), show_default=True, help=f"method to be used to summarize the signal: {{ mean, median, max, min }}") @click.option("--cores", default=1, type=int, show_default=True, help="number of cores to be used") @click.option("--flip_based_on_strand", is_flag=True, show_default=False, default=False, help="Boolean flag. When it is set, the signal is flipped based on the strand") @click.option('--gc_correction_tag', type=str, default=None, help='tag to be used to extract gc coefficient per read from a bam file') def extract_coverage_in_batch(path_to_bam: pathlib.Path, path_to_bed: pathlib.Path, output_path: pathlib.Path, skip_read_fetching, window: int, flanking_window: int, extra_bases: int, n_binding_sites: int, summarization_method: str, cores: int, exp_id: Optional[str], flip_based_on_strand: bool = True, gc_correction_tag: Optional[str] = None ): """ extract_coverage_in_batch extracts the fragment coverage from multiple BED files at once and generates a signal for each BED file provided. """ read_fetcher_config = { "window": window, "flanking_region_window": flanking_window, "extra_bases": extra_bases, "n_binding_sites": n_binding_sites, } reads_transformer_config = {} single_signal_transformer_config = { "signal_transformer": "coverage", "flip_based_on_strand": flip_based_on_strand, "gc_correction": True if gc_correction_tag else False, "tag": gc_correction_tag } transform_all_intervals_config = { "summarization_method": summarization_method, } plot_signal_config = {} save_signal_config = {} extra_config = { "cores": cores } output_path.mkdir(parents=True, exist_ok=True) output_path_interval_spec = output_path / f"{path_to_bam.stem}" / f"{path_to_bed.stem}" output_path_interval_spec.mkdir(parents=True, exist_ok=True) res = App(plugins_name=["coverage_in_batch", ], path_to_bam=path_to_bam, path_to_bed=path_to_bed, output_path=output_path_interval_spec or pathlib.Path.cwd(), skip_read_fetching=skip_read_fetching, read_fetcher_config=ReadFetcherConfig(read_fetcher_config), reads_transformer_config=Config(reads_transformer_config), single_signal_transformer_config=SingleSignalTransformerConfig(single_signal_transformer_config), transform_all_intervals_config=SignalSummarizer(transform_all_intervals_config), plot_signal_config=PlotConfig(plot_signal_config), save_signal_config=Config(save_signal_config), extra_config=AppExtraConfig(extra_config), id=exp_id).run() return res @click.command( short_help="It extracts the coverage around dyads signal from a BAM file for each BED file provided.") @click.option('--path_to_bam', type=click.Path(exists=False, file_okay=True, dir_okay=True, writable=False, readable=True, resolve_path=False, allow_dash=True, path_type=pathlib.Path, executable=False), help='path to the bam file to be used') @click.option('--path_to_bed', type=click.Path(exists=False, file_okay=False, dir_okay=True, writable=False, readable=True, resolve_path=False, allow_dash=True, path_type=pathlib.Path, executable=False), help='path to the directory containing the bed files to be used') @click.option('--output_path', type=click.Path(exists=False, file_okay=False, dir_okay=True, writable=True, readable=True, resolve_path=False, allow_dash=True, path_type=pathlib.Path, executable=False), help='path to the output directory') @click.option("--skip_read_fetching", is_flag=True, show_default=True, help='Boolean flag. When it is set, the fetching of the reads is skipped and the latest' 'timestamp of this run (identified by the id) is retrieved') @click.option("--exp_id", default=None, type=str, show_default=True, help="run id") @click.option("--window", default=1000, type=int, show_default=True, help="Integer describing the number of bases to be extracted around the middle point of an " "interval present in the bedfile") @click.option("--flanking_window", default=1000, type=int, show_default=True, help="Integer describing the number of bases to be extracted after the window") @click.option("--extra_bases", default=2000, type=int, show_default=True, help="Integer describing the number of bases to be extracted from the bamfile when removing the " "unused bases to be sure to get all the proper paires, which may be mapping up to 2000 bs") @click.option("--n_binding_sites", default=1000, type=int, show_default=True, help="number of intervals to be used to extract the signal, if it is higher then the provided" "intervals, all the intervals will be used") @click.option("--summarization_method", default="mean", type=click.Choice(["mean", "median", "max", "min"]), show_default=True, help=f"method to be used to summarize the signal: {{ mean, median, max, min }}") @click.option("--cores", type=int, show_default=True, default=1, help="number of cores to be used for the computation") @click.option("--flip_based_on_strand", is_flag=True, show_default=False, default=False, help="Boolean flag. When it is set, the signal is flipped based on the strand") @click.option('--gc_correction_tag', type=str, default=None, help='tag to be used to extract gc coefficient per read from a bam file') @click.option("--n", default=1, type=int, show_default=True, help="number of bases to retain around the dyad") def extract_coverage_around_dyads_in_batch(path_to_bam: pathlib.Path, path_to_bed: pathlib.Path, output_path: pathlib.Path, skip_read_fetching, window: int, flanking_window: int, extra_bases: int, n_binding_sites: int, summarization_method: str, n: int, cores: int, exp_id: Optional[str], flip_based_on_strand: bool = True, gc_correction_tag: Optional[str] = None ): """ extract_coverage_around_dyads_in_batch extracts the fragment coverage around dyads from multiple BED files at once and generates a signal for each BED file provided. """ read_fetcher_config = { "window": window, "flanking_region_window": flanking_window, "extra_bases": extra_bases, "n_binding_sites": n_binding_sites, } reads_transformer_config = {} single_signal_transformer_config = { "n": n, "signal_transformer": "coverage_dyads", "flip_based_on_strand": flip_based_on_strand, "gc_correction": True if gc_correction_tag else False, "tag": gc_correction_tag } transform_all_intervals_config = { "summarization_method": summarization_method, } distribution = calculate_reference_distribution(path_to_sample=path_to_bam, min_length=0, max_length=600, chr="chr12", start=34_300_000, end=34_500_000 ) peaks = get_peaks(distribution, height=0.01, distance=100) single_signal_transformer_config["peaks"] = [peaks[0]] plot_signal_config = {} save_signal_config = {} extra_config = { "cores": cores } output_path.mkdir(parents=True, exist_ok=True) output_path_interval_spec = output_path / f"{path_to_bam.stem}" / f"{path_to_bed.stem}" output_path_interval_spec.mkdir(parents=True, exist_ok=True) res = App(plugins_name=["coverage_in_batch", ], path_to_bam=path_to_bam, path_to_bed=path_to_bed, output_path=output_path_interval_spec or pathlib.Path.cwd(), skip_read_fetching=skip_read_fetching, read_fetcher_config=ReadFetcherConfig(read_fetcher_config), reads_transformer_config=Config(reads_transformer_config), single_signal_transformer_config=SingleSignalTransformerConfig(single_signal_transformer_config), transform_all_intervals_config=SignalSummarizer(transform_all_intervals_config), plot_signal_config=PlotConfig(plot_signal_config), save_signal_config=Config(save_signal_config), extra_config=AppExtraConfig(extra_config), id=exp_id).run() return res @click.command( short_help="It extracts the midpoint coverage signal from a BAM file for each BED file provided.") @click.option('--path_to_bam', type=click.Path(exists=False, file_okay=True, dir_okay=True, writable=False, readable=True, resolve_path=False, allow_dash=True, path_type=pathlib.Path, executable=False), help='path to the bam file to be used') @click.option('--path_to_bed', type=click.Path(exists=False, file_okay=False, dir_okay=True, writable=False, readable=True, resolve_path=False, allow_dash=True, path_type=pathlib.Path, executable=False), help='path to the directory containing the bed files to be used') @click.option('--output_path', type=click.Path(exists=False, file_okay=False, dir_okay=True, writable=True, readable=True, resolve_path=False, allow_dash=True, path_type=pathlib.Path, executable=False), help='path to the output directory') @click.option("--skip_read_fetching", is_flag=True, show_default=True, help='Boolean flag. When it is set, the fetching of the reads is skipped and the latest' 'timestamp of this run (identified by the id) is retrieved') @click.option("--exp_id", default=None, type=str, show_default=True, help="run id") @click.option("--window", default=1000, type=int, show_default=True, help="Integer describing the number of bases to be extracted around the middle point of an " "interval present in the bedfile") @click.option("--flanking_window", default=1000, type=int, show_default=True, help="Integer describing the number of bases to be extracted after the window") @click.option("--extra_bases", default=2000, type=int, show_default=True, help="Integer describing the number of bases to be extracted from the bamfile when removing the " "unused bases to be sure to get all the proper paires, which may be mapping up to 2000 bs") @click.option("--n_binding_sites", default=1000, type=int, show_default=True, help="number of intervals to be used to extract the signal, if it is higher then the provided" "intervals, all the intervals will be used") @click.option("--summarization_method", default="mean", type=click.Choice(["mean", "median", "max", "min"]), show_default=True, help=f"method to be used to summarize the signal: {{ mean, median, max, min }}") @click.option("--cores", default=1, type=int, show_default=True, help="number of cores to be used") @click.option("--flip_based_on_strand", is_flag=True, show_default=False, default=False, help="Boolean flag. When it is set, the signal is flipped based on the strand") @click.option('--gc_correction_tag', type=str, default=None, help='tag to be used to extract gc coefficient per read from a bam file') def extract_middle_point_coverage_in_batch(path_to_bam: pathlib.Path, path_to_bed: pathlib.Path, output_path: pathlib.Path, skip_read_fetching, window: int, flanking_window: int, extra_bases: int, n_binding_sites: int, summarization_method: str, cores: int, exp_id: Optional[str], flip_based_on_strand: bool = True, gc_correction_tag: Optional[str] = None ): """ extract_middle_point_coverage_in_batch extracts the fragment coverage considering only the central position of each read from multiple BED files at once and generates a signal for each BED file provided. """ read_fetcher_config = { "window": window, "flanking_region_window": flanking_window, "extra_bases": extra_bases, "n_binding_sites": n_binding_sites, } reads_transformer_config = {} single_signal_transformer_config = { "signal_transformer": "middle_point_coverage", "flip_based_on_strand": flip_based_on_strand, "gc_correction": True if gc_correction_tag else False, "tag": gc_correction_tag } transform_all_intervals_config = { "summarization_method": summarization_method, } plot_signal_config = {} save_signal_config = {} extra_config = { "cores": cores } output_path.mkdir(parents=True, exist_ok=True) output_path_interval_spec = output_path / f"{path_to_bam.stem}" / f"{path_to_bed.stem}" output_path_interval_spec.mkdir(parents=True, exist_ok=True) res = App(plugins_name=["coverage_in_batch", ], path_to_bam=path_to_bam, path_to_bed=path_to_bed, output_path=output_path_interval_spec or pathlib.Path.cwd(), skip_read_fetching=skip_read_fetching, read_fetcher_config=ReadFetcherConfig(read_fetcher_config), reads_transformer_config=Config(reads_transformer_config), single_signal_transformer_config=SingleSignalTransformerConfig(single_signal_transformer_config), transform_all_intervals_config=SignalSummarizer(transform_all_intervals_config), plot_signal_config=PlotConfig(plot_signal_config), save_signal_config=Config(save_signal_config), extra_config=AppExtraConfig(extra_config), id=exp_id).run() return res @click.command(short_help="It extracts the middle-n points coverage signal from a BAM file for each BED file " "provided.") @click.option('--path_to_bam', type=click.Path(exists=False, file_okay=True, dir_okay=True, writable=False, readable=True, resolve_path=False, allow_dash=True, path_type=pathlib.Path, executable=False), help='path to the bam file to be used') @click.option('--path_to_bed', type=click.Path(exists=False, file_okay=False, dir_okay=True, writable=False, readable=True, resolve_path=False, allow_dash=True, path_type=pathlib.Path, executable=False), help='path to the directory containing the bed files to be used') @click.option('--output_path', type=click.Path(exists=False, file_okay=False, dir_okay=True, writable=True, readable=True, resolve_path=False, allow_dash=True, path_type=pathlib.Path, executable=False), help='path to the output directory') @click.option("--skip_read_fetching", is_flag=True, show_default=True, help='Boolean flag. When it is set, the fetching of the reads is skipped and the latest' 'timestamp of this run (identified by the id) is retrieved') @click.option("--exp_id", default=None, type=str, show_default=True, help="run id") @click.option("--window", default=1000, type=int, show_default=True, help="Integer describing the number of bases to be extracted around the middle point of an " "interval present in the bedfile") @click.option("--flanking_window", default=1000, type=int, show_default=True, help="Integer describing the number of bases to be extracted after the window") @click.option("--extra_bases", default=2000, type=int, show_default=True, help="Integer describing the number of bases to be extracted from the bamfile when removing the " "unused bases to be sure to get all the proper paires, which may be mapping up to 2000 bs") @click.option("--n_binding_sites", default=1000, type=int, show_default=True, help="number of intervals to be used to extract the signal, if it is higher then the provided" "intervals, all the intervals will be used") @click.option("--summarization_method", default="mean", type=click.Choice(["mean", "median", "max", "min"]), show_default=True, help=f"method to be used to summarize the signal: {{ mean, median, max, min }}") @click.option("--cores", default=1, type=int, show_default=True, help="number of cores to be used") @click.option("--n_middle_pos", default=10, type=int, show_default=True, help="number of position around the middle point to be extracted") @click.option("--flip_based_on_strand", is_flag=True, show_default=False, default=False, help="Boolean flag. When it is set, the signal is flipped based on the strand") @click.option('--gc_correction_tag', type=str, default=None, help='tag to be used to extract gc coefficient per read from a bam file') def extract_middle_n_points_coverage_in_batch(path_to_bam: pathlib.Path, path_to_bed: pathlib.Path, output_path: pathlib.Path, skip_read_fetching, window: int, flanking_window: int, extra_bases: int, n_binding_sites: int, n_middle_pos: int, summarization_method: str, cores: int, exp_id: Optional[str], flip_based_on_strand: bool = True, gc_correction_tag: Optional[str] = None): """ extract_middle_n_points_coverage_in_batch extracts the fragment coverage considering only the central positions of each read from multiple BED files at once and generates a signal for each BED file provided. """ read_fetcher_config = { "window": window, "flanking_region_window": flanking_window, "extra_bases": extra_bases, "n_binding_sites": n_binding_sites, } reads_transformer_config = {} single_signal_transformer_config = { "signal_transformer": "middle_n_points_coverage", "n": n_middle_pos, "flip_based_on_strand": flip_based_on_strand, "gc_correction": True if gc_correction_tag else False, "tag": gc_correction_tag } transform_all_intervals_config = { "summarization_method": summarization_method, } plot_signal_config = {} save_signal_config = {} extra_config = { "cores": cores } output_path.mkdir(parents=True, exist_ok=True) output_path_interval_spec = output_path / f"{path_to_bam.stem}" / f"{path_to_bed.stem}" output_path_interval_spec.mkdir(parents=True, exist_ok=True) res = App(plugins_name=["coverage_in_batch", ], path_to_bam=path_to_bam, path_to_bed=path_to_bed, output_path=output_path_interval_spec or pathlib.Path.cwd(), skip_read_fetching=skip_read_fetching, read_fetcher_config=ReadFetcherConfig(read_fetcher_config), reads_transformer_config=Config(reads_transformer_config), single_signal_transformer_config=SingleSignalTransformerConfig(single_signal_transformer_config), transform_all_intervals_config=SignalSummarizer(transform_all_intervals_config), plot_signal_config=PlotConfig(plot_signal_config), save_signal_config=Config(save_signal_config), extra_config=AppExtraConfig(extra_config), id=exp_id).run() return res @click.command( short_help="It extracts the sliding window coverage signal from a BAM file for each BED file provided.") @click.option('--path_to_bam', type=click.Path(exists=False, file_okay=True, dir_okay=True, writable=False, readable=True, resolve_path=False, allow_dash=True, path_type=pathlib.Path, executable=False), help='path to the bam file to be used') @click.option('--path_to_bed', type=click.Path(exists=False, file_okay=False, dir_okay=True, writable=False, readable=True, resolve_path=False, allow_dash=True, path_type=pathlib.Path, executable=False), help='path to the directory containing the bed files to be used') @click.option('--output_path', type=click.Path(exists=False, file_okay=False, dir_okay=True, writable=True, readable=True, resolve_path=False, allow_dash=True, path_type=pathlib.Path, executable=False), help='path to the output directory') @click.option("--skip_read_fetching", is_flag=True, show_default=True, help='Boolean flag. When it is set, the fetching of the reads is skipped and the latest' 'timestamp of this run (identified by the id) is retrieved') @click.option("--exp_id", default=None, type=str, show_default=True, help="run id") @click.option("--window", default=1000, type=int, show_default=True, help="Integer describing the number of bases to be extracted around the middle point of an " "interval present in the bedfile") @click.option("--flanking_window", default=1000, type=int, show_default=True, help="Integer describing the number of bases to be extracted after the window") @click.option("--extra_bases", default=2000, type=int, show_default=True, help="Integer describing the number of bases to be extracted from the bamfile when removing the " "unused bases to be sure to get all the proper paires, which may be mapping up to 2000 bs") @click.option("--n_binding_sites", default=1000, type=int, show_default=True, help="number of intervals to be used to extract the signal, if it is higher then the provided" "intervals, all the intervals will be used") @click.option("--summarization_method", default="mean", type=click.Choice(["mean", "median", "max", "min"]), show_default=True, help=f"method to be used to summarize the signal: {{ mean, median, max, min }}") @click.option("--cores", default=1, type=int, show_default=True, help="number of cores to be used") @click.option("--window_size", default=5, type=int, show_default=True, help="window size to be used to compute the sliding_window_coverage") @click.option("--flip_based_on_strand", is_flag=True, show_default=True, default=False, help="flip the signal based on the strand") @click.option('--gc_correction_tag', type=str, default=None, help='tag to be used to extract gc coefficient per read from a bam file') def extract_sliding_window_coverage_in_batch(path_to_bam: pathlib.Path, path_to_bed: pathlib.Path, output_path: pathlib.Path, skip_read_fetching, window: int, flanking_window: int, extra_bases: int, n_binding_sites: int, summarization_method: str, cores: int, exp_id: Optional[str], window_size: int, flip_based_on_strand: bool = True, gc_correction_tag: Optional[str] = None): """ extract_sliding_window_coverage_in_batch extracts the fragment coverage using a sliding window approach to reduce the problems encountered when using low coverage samples and generates a signal for each BED provided file. """ read_fetcher_config = { "window": window, "flanking_region_window": flanking_window, "extra_bases": extra_bases, "n_binding_sites": n_binding_sites, } reads_transformer_config = {} single_signal_transformer_config = { "signal_transformer": "sliding_window_coverage", "window_size": window_size, "flip_based_on_strand": flip_based_on_strand, "gc_correction": True if gc_correction_tag else False, "tag": gc_correction_tag } transform_all_intervals_config = { "summarization_method": summarization_method, } plot_signal_config = {} save_signal_config = {} extra_config = { "cores": cores } output_path.mkdir(parents=True, exist_ok=True) output_path_interval_spec = output_path / f"{path_to_bam.stem}" / f"{path_to_bed.stem}" output_path_interval_spec.mkdir(parents=True, exist_ok=True) res = App(plugins_name=["coverage_in_batch", ], path_to_bam=path_to_bam, path_to_bed=path_to_bed, output_path=output_path_interval_spec or pathlib.Path.cwd(), skip_read_fetching=skip_read_fetching, read_fetcher_config=ReadFetcherConfig(read_fetcher_config), reads_transformer_config=Config(reads_transformer_config), single_signal_transformer_config=SingleSignalTransformerConfig(single_signal_transformer_config), transform_all_intervals_config=SignalSummarizer(transform_all_intervals_config), plot_signal_config=PlotConfig(plot_signal_config), save_signal_config=Config(save_signal_config), extra_config=AppExtraConfig(extra_config), id=exp_id).run() return res return [extract_coverage_in_batch, extract_coverage_around_dyads_in_batch, extract_middle_point_coverage_in_batch, extract_middle_point_coverage_in_batch, extract_middle_n_points_coverage_in_batch, extract_sliding_window_coverage_in_batch]
hook = FextractHooks() hook_cli = CliHook()