Source code for lbfextract.fextract.signal_transformer

import typing as t

import bottleneck
import numpy as np
import pysam

from lbfextract.utils import adapt_indices


[docs] class GenomiIntervalDataFrameRow(t.NamedTuple): Start: int End: int Chromosome: str reads_per_interval: t.Iterator[pysam.AlignedSegment]
[docs] class TFBSCoverage: """ This class calculates the fragment coverage for a genomic interval and allow the possibility to correct for GC bias when a GC bias specific tag was added to each read in a BAM file. """ def __init__(self, gc_correction: bool = False, tag: str = None) -> None: """ : gc_correction : boolean value describing whether GC correction should be performed : tag : BAM file tag to be used to correct for GC bias """ self.gc_correction = gc_correction self.tag = tag def __call__(self, x: GenomiIntervalDataFrameRow) -> np.array: """ : x : NamedTuple containing the genomic interval information and an iterator over the reads present in it """ start = x.Start end = x.End region_length = end - start tensor = np.zeros(region_length) for read in x.reads_per_interval: gc_coef = read.get_tag(self.tag) if self.gc_correction and read.has_tag(self.tag) else 1 relative_start = read.pos - start index = adapt_indices(relative_start, relative_start + read.template_length, region_length) if index is not None: tensor[index] += 1 * gc_coef return tensor
[docs] class TFBSCoverageAroundDyads: def __init__(self, n=1, gc_correction: bool = False, tag: str = None, peaks: list = None): self.n = n self.gc_correction = gc_correction self.tag = tag self.peaks = peaks
[docs] def get_relative_start_end(self, read, start) -> list: relative_start = read.pos - start f = np.abs(read.template_length) s = [] nucleosome_length = self.peaks[0] + 5 n_of_possible_nucleosomes = f // nucleosome_length remainder = f % nucleosome_length p_same = ((nucleosome_length - remainder) / nucleosome_length) p_next = 1 - p_same n_of_possible_nucleosomes = np.random.choice( [n_of_possible_nucleosomes, n_of_possible_nucleosomes + 1], p=[p_same, p_next] ) def get_expanded_length(n_of_possible_nucleosomes_, nucleosome_length_): if n_of_possible_nucleosomes_ <= 1: return nucleosome_length_ else: return n_of_possible_nucleosomes_ * nucleosome_length_ def get_nucleosome_centers(n_of_possible_nucleosomes_, nucleosome_length_): if n_of_possible_nucleosomes_ <= 1: return [nucleosome_length_ // 2] elif n_of_possible_nucleosomes_ == 2: return [nucleosome_length_ // 2, nucleosome_length_ * 3 - (nucleosome_length_ // 2)] elif n_of_possible_nucleosomes_ == 3: return [nucleosome_length_ // 2, (nucleosome_length_ * 3) // 2, nucleosome_length_ * 3 - (nucleosome_length_ // 2)] else: return [nucleosome_length_ // 2 + (i * nucleosome_length_) for i in range(n_of_possible_nucleosomes_)] expanded_fragment_length = get_expanded_length(n_of_possible_nucleosomes, nucleosome_length) midpoint = relative_start + (f // 2) relative_expanded_start = midpoint - (expanded_fragment_length // 2) centers = get_nucleosome_centers(n_of_possible_nucleosomes, nucleosome_length) for center in centers: n_s = relative_expanded_start + center - self.n n_e = relative_expanded_start + center + self.n s.append([n_s, n_e]) return s
def __call__(self, x: GenomiIntervalDataFrameRow): start = x.Start end = x.End region_length = end - start tensor = np.zeros(region_length) for read in x.reads_per_interval: gc_coef = read.get_tag(self.tag) if self.gc_correction and read.has_tag(self.tag) else 1 relative_starts_ends_list = self.get_relative_start_end(read, start) for n_s, n_e in relative_starts_ends_list: index_col = adapt_indices(n_s, n_e, region_length) if index_col is not None: tensor[index_col] += 1 * gc_coef return tensor
[docs] class TFBSMiddlePointCoverage: def __init__(self, gc_correction: bool = False, tag: str = None): self.gc_correction = gc_correction self.tag = tag def __call__(self, x: GenomiIntervalDataFrameRow): start = x.Start end = x.End region_length = end - start tensor = np.zeros(region_length) for read in x.reads_per_interval: gc_coef = read.get_tag(self.tag) if self.gc_correction and read.has_tag(self.tag) else 1 relative_start = read.pos - start middle_point = relative_start + (read.template_length // 2) indices = adapt_indices(middle_point, middle_point + 1, region_length) if indices is not None: tensor[indices] += 1 * gc_coef return tensor
[docs] class TFBSNmiddlePointCoverage: def __init__(self, n=1, gc_correction: bool = False, tag: str = None): self.n = n self.gc_correction = gc_correction self.tag = tag def __call__(self, x: GenomiIntervalDataFrameRow): start = x.Start end = x.End region_length = end - start tensor = np.zeros(region_length) for read in x.reads_per_interval: gc_coef = read.get_tag(self.tag) if self.gc_correction and read.has_tag(self.tag) else 1 relative_start = read.pos - start middle_point = relative_start + (read.template_length // 2) indices = adapt_indices(middle_point - self.n, middle_point + self.n, region_length) if indices is not None: tensor[indices] += 1 * gc_coef return tensor
[docs] class TFBSSlidingWindowCoverage: def __init__(self, window_size: int, gc_correction: bool = False, tag: str = None): self.window_size = window_size self.gc_correction = gc_correction self.tag = tag def __call__(self, x: GenomiIntervalDataFrameRow): start = x.Start end = x.End region_length = end - start tensor = np.zeros(region_length) for read in x.reads_per_interval: gc_coef = read.get_tag(self.tag) if self.gc_correction and read.has_tag(self.tag) else 1 relative_start = read.pos - start indices = adapt_indices(relative_start, relative_start + read.template_length, region_length) if indices is not None: tensor[indices] += 1 * gc_coef tensor = bottleneck.move_mean(tensor, window=self.window_size, min_count=1) return tensor
[docs] class FragmentLengthDistribution: def __init__(self, min_fragment_length=100, max_fragment_length=400, gc_correction: bool = False, tag: str = None): self.min_fragment_length = min_fragment_length self.max_fragment_length = max_fragment_length self.gc_correction = gc_correction self.tag = tag def __call__(self, x: GenomiIntervalDataFrameRow): start = x.Start end = x.End region_length = end - start relative_fragment_length = self.max_fragment_length - self.min_fragment_length tensor = np.zeros((relative_fragment_length + 1, region_length)) for read in x.reads_per_interval: gc_coef = read.get_tag(self.tag) if self.gc_correction and read.has_tag(self.tag) else 1 fragment_length = read.template_length if self.min_fragment_length <= fragment_length <= self.max_fragment_length: relative_start = read.pos - start indices = adapt_indices(relative_start, relative_start + read.template_length, region_length) if indices is not None: tensor[relative_fragment_length, indices] += 1 * gc_coef return tensor
[docs] class PeterUlzCoverage: def __init__(self, gc_correction: bool, tag: str, read_start: int = 53, read_end: int = 113): self.gc_correction = gc_correction self.tag = tag self.read_start = read_start self.read_end = read_end def __call__(self, x: GenomiIntervalDataFrameRow): start = x.Start end = x.End region_length = end - start tensor = np.zeros(region_length) for read in x.reads_per_interval: gc_coef = read.get_tag(self.tag) if self.gc_correction and read.has_tag(self.tag) else 1 relative_start = read.pos - start relative_end = relative_start + read.template_length first_read_dyad_pos = [relative_start + self.read_start, relative_start + self.read_end] second_read_dyad_pos = [relative_end - self.read_end, relative_end - self.read_start] for relative_start, relative_end in [first_read_dyad_pos, second_read_dyad_pos]: indices = adapt_indices(relative_start, relative_end, region_length) if indices is not None: tensor[indices] += 1 * gc_coef return tensor
[docs] class WPSCoverage(TFBSCoverage): def __init__(self, gc_correction: bool = False, tag: str = None, window_size: int = None, min_fragment_length: int = None, max_fragment_length: int = None): super().__init__(gc_correction, tag) self.window_size = window_size self.min_fragment_length = min_fragment_length self.max_fragment_length = max_fragment_length
[docs] def get_minus_one_indices(self, relative_start, relative_end, region_length): starting_start = relative_start - self.window_size // 2 starting_end = relative_start + self.window_size // 2 ending_start = relative_end - self.window_size // 2 ending_end = relative_end + self.window_size // 2 indices_start = adapt_indices(starting_start, starting_end, region_length) indices_end = adapt_indices(ending_start, ending_end, region_length) return indices_start, indices_end
def __call__(self, x: GenomiIntervalDataFrameRow): start = x.Start end = x.End region_length = end - start regions_minus_one = np.zeros(region_length) regions_plus_one = np.zeros(region_length) for read in x.reads_per_interval: if not self.min_fragment_length < read.template_length <= self.max_fragment_length: continue gc_coef = read.get_tag(self.tag) if self.gc_correction and read.has_tag(self.tag) else 1 relative_start = read.pos - start relative_end = relative_start + read.template_length indices = adapt_indices(relative_start + (self.window_size // 2), relative_end - (self.window_size // 2), region_length) indices_start, indices_end = self.get_minus_one_indices(relative_start, relative_end, region_length) if indices_start is not None: regions_minus_one[indices_start] += 1 * gc_coef if indices_end is not None: regions_minus_one[indices_end] += 1 * gc_coef if indices is not None: regions_plus_one[indices] += 1 * gc_coef wps_g = (regions_plus_one - regions_minus_one) return wps_g - bottleneck.move_median(wps_g, window=100, min_count=1)