Source code for tffm_module

"""
Module implementing the TFFMs.

:platform: Unix
:synopsis: Define the class representing the Transcription Factor Flexible
    Models and the necessary functions to manipulate them.
:todo: Allow the construction of TFFMs using a different *de novo* motif
    finding tool than MEME.

"""


# TODO Allow the construction of TFFMs using a different *de novo* motif
# finding tool.


import sys
import os
import math
import re
from Bio.Alphabet import IUPAC
from Bio import SeqIO

#
# Changed to use the newer Bio.motifs package
# DJA 2013/04/05
#
#from Bio.Motif.Parsers import MEME
from Bio import motifs

import ghmm

import drawing
import exceptions_errors
import utils
import hit_module

from constants import ALPHABET, EXTENDED_ALPHABET, TFFM_KIND, LOGO_TYPE


[docs]class TFFM(ghmm.DiscreteEmissionHMM): """ Define the Transcription Factor Flexible Models. .. note:: Instances of this class have to be created through the functions :func:`tffm_from_xml` or :func:`tffm_from_meme`. """
[docs] def __init__(self, emission_domain, distribution, cmodel, kind, name="TFFM"): """ Construct an instance of the TFFM class. :arg emission_domain: The emission domain of the underlying :class:`ghmm.HMM`. :type emission_domain: ghmm.EmissionDomain :arg distribution: The distribution over the emission domain. :type distribution: :class:`ghmm.Distribution` :arg cmodel: The cmodel (HMM itself implemented in C) of the underlying :class:`ghmm.HMM`. :arg kind: The TFFM can be either a 0-order, a 1st-order, or a detailed TFFM, use `TFFM_KIND.ZERO_ORDER, or `TFFM_KIND.FIRST_ORDER`, or `TFFM_KIND.DETAILED` respectively. :type kind: Enum :arg name: Give the name of the TFFM. 'TFFM' is given by default. :type name: str :raises: :class:`exceptions.TFFMKindError` when the given kind is neither '1st-order' nor 'detailed'. """ # Construct the underlying ghmm.EmissionHMM super(ghmm.DiscreteEmissionHMM, self).__init__(emission_domain, distribution, cmodel) if(kind != TFFM_KIND.FIRST_ORDER and kind != TFFM_KIND.DETAILED and kind != TFFM_KIND.ZERO_ORDER): raise exceptions_errors.TFFMKindError(kind) self.kind = kind self.name = name
[docs] def __del__(self): """ Delete the underlying C structures. :note: The destruction is made using the :class:`ghmm.DiscreteEmissionHMM` destructor. """ super(ghmm.DiscreteEmissionHMM, self).__del__()
[docs] def __len__(self): """ Give the length of the TFFM, i.e. the number of nucleotides in the model excluding the background. """ if self.kind == TFFM_KIND.FIRST_ORDER: return self.N - 2 elif self.kind == TFFM_KIND.DETAILED: return self.N / 4 - 1 else: # 0-order HMM here return self.N - 1
[docs] def background_emission_proba(self): """ Return the emission probabilities of the nucleotides in the background state. :returns: A dictionnary with characters 'A', 'C', 'G', and 'T' as keys and the corresponding probabilities as values. :rtype: dict """ emissions = {'A': 0., 'C': 0., 'G': 0., 'T': 0.} if self.kind == TFFM_KIND.FIRST_ORDER: emissions = background_emission_proba_1storder(self) elif self.kind == TFFM_KIND.DETAILED: emissions = background_emission_proba_detailed(self) else: # 0-order HMM here emissions = background_emission_proba_detailed(self) for i in xrange(4): emissions[ALPHABET[i]] = self.getEmission(0)[i] return emissions
[docs] def get_positions_ic(self): """ Give the information content for every positions of the motif modeled by the TFFM. :returns: A list of floats giving the information contents of the positions. :rtype: list :note: The output is an ordered list following the order of the positions within the motif. """ previous_position_proba = self.background_emission_proba() positions_information_content = [] # Get the position of the first matching state in the TFFM. start = self.get_position_start() # Iterate over all matching position in the TFFM. for pos in xrange(start, len(self) + start): position_proba = {'A': 0., 'C': 0., 'G': 0., 'T': 0.} # Compute the emission probabilities for each nucleotide at the # current position if (self.kind == TFFM_KIND.DETAILED or self.kind == TFFM_KIND.FIRST_ORDER): for i in xrange(4): emissions = self.get_emission_update_pos_proba( position_proba, pos, previous_position_proba, i) else: # 0-order HMM here emissions = self.get_emission_update_pos_proba(position_proba, pos, previous_position_proba, 0) previous_position_proba = position_proba.copy() if self.kind == TFFM_KIND.DETAILED: somme = sum(previous_position_proba.values()) # For a detailed TFFM, one position is composed of 4 states so # we need to use proportions to compute proba. for i in xrange(4): previous_position_proba[ALPHABET[i]] /= somme # Compute the entropy given the corresponding probabilites and then # the underlying information content values = previous_position_proba.values() keys = previous_position_proba.keys() emissions = zip(values, keys) entropy = compute_entropy(emissions) positions_information_content.append(2.0 - entropy) return positions_information_content
[docs] def get_information_content(self): """ Give the information content of the whole TFFM. :returns: A float corresponding to the information content of the TFFM. :rtype: float """ pos_ic = self.get_positions_ic() return sum(pos_ic)
[docs] def final_states(self): """ Give the list of final states in the HMM (i.e. corresponding to the last matching position in the TFFM). :returns: A list of final states as int. :rtype: list """ if self.kind == TFFM_KIND.FIRST_ORDER: return [len(self) + 1] elif self.kind == TFFM_KIND.DETAILED: size = len(self) * 4 return [size, size + 1, size + 2, size + 3] else: # 0-order HMM here return [len(self)]
[docs] def train(self, training_file, epsilon=0.0001, max_iter=500): """ Train the TFFM using the fasta sequences to learn emission and transition probabilities. :note: The training of the underlying HMM is made using the Baum-Welsh algorithm. :arg training_file: The fasta file of the sequences to train the TFFM on. :type training_file: str :arg epsilon: The least relative improvement cut-off in likelihood compared to the previous iteration of the Baum-Welsh algorithm (default: 0.0001). :type epsilon: float :arg max_iter: The maximum number of iteration of the Baum-Welsh algorithm to reestimate the probabilities (default: 500). :type max_iter: int """ assert(os.path.isfile(training_file)) # Only upper case is allowed in the ALPHABET, need to convert sequences = [] for record in SeqIO.parse(training_file, "fasta"): sequence = record.seq.upper() # Only considering sequences with ACGTs if not re.search("[^AGCT]", str(sequence)): sequences.append(sequence) training_sequences = ghmm.SequenceSet(ghmm.Alphabet(ALPHABET), sequences) # Need to give the same weight to all the sequences since it does not # seem to be done by default by ghmm. utils.set_sequences_weight(training_sequences, 1.0) self.baumWelch(training_sequences, max_iter, epsilon)
[docs] def scan_sequence(self, sequence, threshold=0.0, only_best=False): """ Apply the TFFM on the fasta sequence and return the TFBS hits. :arg sequence: DNA sequence to apply the TFFM on. :type sequence: :class:`Bio.SeqRecord` :arg threshold: The threshold used to predict a hit (i.e. the minimal probability value for a position to be considered a TFBS hit) (default: 0.0). :type threshold: float :arg only_best: Argument to be set to :class:`True` if only the best TFBS hit per sequence is to be reported (default: :class:`False`) :type only_best: bool :returns: TFBS hits. :rtype: list of :class:`HIT` :note: (**0.0<=** *threshold* **<=1.0**) """ # Retrieve the hits on both the positive and the negative strands. hits_positive_strand = self._get_hits(sequence, threshold) hits_negative_strand = self._get_hits(sequence, threshold, negative=True) return merge_hits(hits_positive_strand, hits_negative_strand, only_best)
[docs] def scan_sequences(self, seq_file, threshold=0., only_best=False): """ Apply the TFFM on the fasta sequences and return the TFBS hits. :arg seq_file: Fasta file giving the DNA sequences to apply the TFFM on. :type seq_file: str :arg threshold: The threshold used to predict a hit (i.e. the minimal probability value for a position to be considered a TFBS hit) (default: 0.0). :type threshold: float :arg only_best: Argument to be set to :class:`True` if only the best TFBS hit per sequence is to be reported (default: :class:`False`) :type only_best: bool :returns: TFBS hits through a generator. :rtype: :class:`Generator` of :class:`HIT` :note: (**0.0<=** *threshold* **<=1.0**) """ sequence_list = utils.parse_fasta(seq_file) for seq_record in sequence_list: hits = self.scan_sequence(seq_record, threshold, only_best) for hit in hits: yield hit
[docs] def pocc_sequences(self, seq_file, threshold=0.): """ Apply the TFFM on the fasta sequences and return the Pocc value (probability of occupancy) for each sequence. :arg seq_file: Fasta file giving the DNA sequences to apply the TFFM on. :type seq_file: str :arg threshold: The threshold used to predict hits that will be used to compute the Pocc (default: 0.0). :type threshold: float :returns: Pocc values through a generator. :rtype: :class:`Generator` of :class:`HIT` :note: (**0.0<=** *threshold* **<=1.0**) """ sequence_list = utils.parse_fasta(seq_file) for seq_record in sequence_list: pocc = 1. for hit in self.scan_sequence(seq_record, threshold, False): if hit: pocc *= (1. - hit.score) yield hit_module.HIT(seq_record, 1, len(seq_record), None, pocc, self, None) # Not sure it is a good habit to trim in-place
[docs] def trim_in_place(self, threshold): """ Trim the current TFFM by removing edges with low information content. :arg threshold: The minimal information content value for an edge TFFM match position to be kept. :type threshold: float :warning: Trims the TFFM in place. To preserve the TFFM, use the :func:`get_trimmed` method which returns a trimmed copy of the TFFM but does not alter this TFFM. :see also: :func:`get_trimmed` """ first, last = self.get_significant_positions(threshold) # Only the underlying HMM is to be trimmed by trimming the # corresponding transition and emission probability matrices new_hmm = self._get_trimmed_hmm(first, last) # The trimming is made in-place so the instance attributes have to be # updated self.cmodel = new_hmm.cmodel self.N = new_hmm.cmodel.N self.M = new_hmm.cmodel.M self.model_type = new_hmm.cmodel.model_type self.maxorder = new_hmm.cmodel.maxorder
[docs] def get_trimmed(self, threshold, new_name="TFFM"): """ Trim the current TFFM by removing edges with low information content. :arg threshold: The minimal information content value for an edge TFFM match position to be kept. :type threshold: float :arg new_name: Name of the new TFFM to create (default:'TFFM'). :type new_name: str :returns: A TFFM corresponding to the current TFFM trimmed. :rtype: :class:`TFFM` :see also: :func:`trim_in_place` """ first, last = self.get_significant_positions(threshold) # Only the underlying HMM is to be trimmed by trimming the # corresponding transition and emission probability matrices new_hmm = self._get_trimmed_hmm(first, last) return TFFM(new_hmm.emissionDomain, new_hmm.distribution, new_hmm.cmodel, new_name, self.kind)
[docs] def _get_trimmed_hmm(self, first, last): """ Return the new trimmed HMM. :arg first: Position of the new first matching position. :type first: int :arg last: Position of the new last matching position. :type last: int :returns: The new trimmed HMM. :rtype: :class:`ghmm.DiscreteEmissionHMM` :todo: Raise an error rather than a :func:`sys.exit` when the trimmed HMM becomes empty. """ nb_states = last - first + 1 if nb_states < 0: # TODO raise an error rather than a sys.exit sys.exit("\nTrimmed HMM is empty!!!\n") # Retrieve the matrices of the current HMM matrices = self.asMatrices() transitions = matrices[0] emissions = matrices[1] # Trim the transition matrices and update the emission matrices wrt the # new first and last positions by iterating through all the states for index in xrange(self.N): transitions[index] = transitions[index][0:nb_states + 2] if index > 1 and index < nb_states + 2: emissions[index] = emissions[first + 1] first += 1 # Only keep the probabilities lying in the new matching positions transitions = transitions[0:nb_states + 2] transitions[nb_states + 1][nb_states + 1] = 0.0 transitions[nb_states + 1][1] = 1.0 emissions = emissions[0:nb_states + 2] initials = matrices[2][0:nb_states + 2] return ghmm.HMMFromMatrices(self.emissionDomain, self.distribution, transitions, emissions, initials)
[docs] def get_significant_positions(self, threshold): """ Get the first and last significant position the TFFM where the insignificant positions are the ones on the edges with low information content. :arg threshold: The minimal information content to consider a position to be significant. :type threshold: float :returns: The positions of the first and last positions that are to be considered significant (given in this order). :rtype: tuple """ pos_ic = self.get_positions_ic() first = 1 last = len(self) while first <= last and pos_ic[first - 1] < threshold: first += 1 while last > 0 and pos_ic[last - 1] < threshold: last -= 1 return first, last
[docs] def _get_posterior_proba(self, sequence_split): """ Get the posterior probabilities at each nucleotide position given the TFFM. :arg sequence_split: The sequence splitted in subsequences to not consider non ACGT nucleotides. :type sequence_split: list :returns: The posterior probabilities at each position of the sequence. :rtype: list of list :note: One example of a sequence_split is ["ACT", "N", "ATC"]. """ ghmm_extended_alphabet = ghmm.Alphabet(EXTENDED_ALPHABET) posterior_proba = [] # null probabilities for non ACGT nucleotides. null_proba = [0.] * self.N for sequence in sequence_split: if re.match("[ACGT]", sequence): emission_sequence = ghmm.SequenceSet(ghmm_extended_alphabet, [sequence])[0] posterior_proba.extend(self.posterior(emission_sequence)) else: for __ in xrange(len(sequence)): posterior_proba.append(null_proba) return posterior_proba
[docs] def _get_hits(self, seq_record, threshold, negative=False): """ Predict TFBS hits in the sequence given the TFFM. :arg seq_record: The sequence on which to predict TFBS hits. :type seq_record: :class:`Bio.SeqRecord` :arg threshold: The minimal probability to predict a position as a TFBS hit. :type threshold: float :arg negative: A boolean stating if the TFBS hits are to be predicted on the positive or the negative strand of the sequence. Set to True when on the negative strand (default: False). :type negative: bool :returns: The list of TFBS hits predicted on the sequence strand. :rtype: list of :class:`HIT` """ sequence = seq_record.seq.upper() if negative: sequence = seq_record.reverse_complement().seq sequence_split = re.split("([ACGT]+)", str(sequence)) posterior_proba = self._get_posterior_proba(sequence_split) hits = self._construct_hits(posterior_proba, seq_record, threshold, negative) return hits
[docs] def get_emission_update_pos_proba(self, position_proba, position, previous_position_proba, index): """ Get the emission probabilities of ACGT at position *position* and update the emission probabilities in *position_proba* given the emission probabilities at the previous position (*previous_position_proba*). :note: This function is used state by state and several states represent the same position in detailed TFFM, this is why we need to update the probabilities listed in position_proba. :arg position_proba: Probabilities of getting ACGT at the current position that need to be updated. :type position_proba: dict :arg position: Current position in the motif. :type position: int :arg previous_position_proba: Probabilities of getting ACGT at the previous position. :type previous_position_proba: dict :arg index: Represents the index of the state of the TFFM to be analyzed at the current position. :returns: The emission probabilities of ACGT by the state indexed by *index* at position *position* in the TFFM. :rtype: list """ emissions_dic = {'A': 0., 'C': 0., 'G': 0., 'T': 0.} if self.kind == TFFM_KIND.FIRST_ORDER: for j in xrange(4): letter = ALPHABET[j] emissions_dic[letter] = self.getEmission(position)[index * 4 + j] proba = self.getEmission(position)[j * 4 + index] proba *= previous_position_proba[letter] position_proba[ALPHABET[index]] += proba emissions = zip(emissions_dic.values(), emissions_dic.keys()) elif self.kind == TFFM_KIND.DETAILED: for j in xrange(4): start_state = (position - 1) * 4 + index end_state = position * 4 + j emissions_dic[ALPHABET[j]] = self.getTransition(start_state, end_state) proba = previous_position_proba[ALPHABET[index]] proba *= self.getTransition(start_state, end_state) position_proba[ALPHABET[j]] += proba somme = sum(emissions_dic.values()) emissions = zip(emissions_dic.values(), emissions_dic.keys()) emissions = map(lambda (x, y): (x / somme, y), emissions) else: # 0-order HMM here for j in xrange(4): letter = ALPHABET[j] emissions_dic[letter] = self.getEmission(position)[j] proba = self.getEmission(position)[j] position_proba[ALPHABET[j]] += proba emissions = zip(emissions_dic.values(), emissions_dic.keys()) emissions.sort(reverse=True) return emissions
[docs] def _construct_hits(self, posterior_proba, seq_record, threshold, negative): """ Compute the TFBS hits on a sequence given the posterior probabilities and construct the corresponding instances of :class:`HIT`. :arg posterior_proba: The posterior probabilities at each position of the sequence computed given the tffm. :type posterior_proba: list of list of float :arg seq_record: The sequence on which to predict TFBS hits. :type seq_record: :class:`Bio.SeqRecord` :arg threshold: The minimal probability to predict a position as a TFBS hit. :type threshold: float :arg negative: A boolean stating if the TFBS hits are to be predicted on the positive or the negative strand of the sequence. Set to True when on the negative strand. :type negative: bool :returns: The list of TFBS hits predicted on the sequence strand. :rtype: list of :class:`HIT` """ hits = [None] * len(seq_record) # Iterate over all the positions of the sequences by starting at the # len(self) position since we need to read at least len(self) # nucleotides to predict a potential TFBS hit. for position in xrange(len(self) - 1, len(posterior_proba)): best_proba = 0.0 # Iterate over the final states of the TFFM to predict TFBS hits by # looking at the posterior probabilities for these states only at # this specific position for state in self.final_states(): proba = posterior_proba[position][state] # We want to only keep the final state giving the maximal # posterior probability. if proba > threshold and proba > best_proba: best_proba = proba start, end, strand = hit_module.get_start_end_strand( position, seq_record, self, negative) hits[end - 1] = hit_module.HIT(seq_record, start, end, strand, proba, self, state) return hits
[docs] def get_position_start(self): """ Give the position of the first matching state. :returns: The position of the first matching state of the TFFM. :rtype: float :warning: The position is given 0-based. """ if self.kind == TFFM_KIND.FIRST_ORDER: return 2 else: # Both detailed and 0-order return 1
[docs]def background_emission_proba_1storder(tffm): emissions = {'A': 0., 'C': 0., 'G': 0., 'T': 0.} last_emissions = {'A': 0., 'C': 0., 'G': 0., 'T': 0.} # Retrieve emission proba for the first state which is not # 1st-order for i in xrange(4): last_emissions[ALPHABET[i]] = tffm.getEmission(0)[i] # Compute emission proba for the background for i in xrange(4): for j in xrange(4): proba = tffm.getEmission(1)[j * 4 + i] proba *= last_emissions[ALPHABET[j]] emissions[ALPHABET[i]] += proba return emissions
[docs]def background_emission_proba_detailed(tffm): emissions = {'A': 0., 'C': 0., 'G': 0., 'T': 0.} # Compute emission proba for the background for i in xrange(4): for j in xrange(4): proba = tffm.getTransition(i, j) * 0.25 emissions[ALPHABET[j]] += proba somme = sum(emissions.values()) for i in xrange(4): # Need to divide by somme since four states are representing # the same position within a detailed TFFM emissions[ALPHABET[i]] /= somme return emissions
[docs]def best_hit(hits_positive_strand, hits_negative_strand): """ Give the best hit in a sequence by considering both positive and negative strands. :arg hits_positive_strand: The list of hits on the positive strand. :type hits_positive_strand: list of :class:`HIT` :arg hits_negative_strand: The list of hits on the negative strand. :type hits_negative_strand: list of :class:`HIT` :returns: The best hit (None if no hit). :rtype: :class:`HIT` """ max_positive_strand = max(hits_positive_strand) max_negative_strand = max(hits_negative_strand) if max_positive_strand or max_negative_strand: return max(max_positive_strand, max_negative_strand) else: return None
[docs]def merge_hits(hits_positive_strand, hits_negative_strand, only_best): """ Merges the hits from both strands. :arg hits_positive_strand: The list of hits on the positive strand. :type hits_positive_strand: list of :class:`HIT` :arg hits_negative_strand: The list of hits on the negative strand. :type hits_negative_strand: list of :class:`HIT` :arg only_best: Boolean set to True only if the best TFBS hit in the sequence is to be kept. :type only_best: bool :returns: A list containing the TFBS hits (empty if no hit). :rtype: list :note: The two input lists are required to be ordered following the positions on the sequence. The best hit per position is given. When no hit has been found at a position, the constant None is used. """ if only_best: return [best_hit(hits_positive_strand, hits_negative_strand)] else: return [hit for hit in utils.roundrobin(hits_positive_strand, hits_negative_strand)]
[docs]def compute_entropy(emissions): """ Compute the entropy given the emission probabilities of the ACGT nucleotides. :arg emissions: Emission probabilities of the ACGT nucleotides. :type emissions: list of float :returns: The computed entropy. :rtype: float :warning: The list gives the probabilities corresponding to A, C, G, and T **in this order**. """ entropy = 0. for i in xrange(4): proba, __ = emissions[i] if proba != 0.: entropy += proba * math.log(proba, 2) return -entropy
[docs]def tffm_from_xml(xml, kind): """ Construct a TFFM described in an XML file. :arg xml: File containing the TFFM description in XML format. :type xml: str :arg kind: Type of TFFM to construct between '1st-order' and 'detailed'. :type kind: str :returns: The TFFM described in the XML file. :rtype: :class:`TFFM` """ hmm = ghmm.HMMOpen(xml) return TFFM(hmm.emissionDomain, hmm.distribution, hmm.cmodel, kind)
[docs]def create_1storder_hmm(nb_seq, nb_residues, first_letters, motif): """ Create a 1st-order HMM initialized from MEME result :arg nb_seq: Number of sequences used by MEME :type nb_seq: int :arg nb_residues: Number of residues used by MEME :type nb_residues: int :arg first_letters: Number of occurrences of ACGT at the begining of sequences used by MEME :type first_letters: dic of str->int :arg motif: PFM as a Biopython motif to be used to initialize the TFFFM :type motif: :class:`Bio.motifs` :returns: The constructed HMM :rtype: :class:`ghmm.DiscreteEmissionHMM` """ # Very first state is created with the initial nt frequencies and a '1' # pseudocount emissions = [[(first_letters['A'] + 1.) / (nb_seq + 4.), (first_letters['C'] + 1.) / (nb_seq + 4.), (first_letters['G'] + 1.) / (nb_seq + 4.), (first_letters['T'] + 1.) / (nb_seq + 4.)]] # The second state is random emissions.append([0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25]) # Complete the emissions with the actual motif frequencies if motif.instances: # The motif.counts is computed directly when creating the motif from # instances nb_hits = len(motif.instances) else: nb_hits = nb_seq for position in xrange(len(motif)): frequencies = [] for letter in "ACGT": freq = (motif.counts[letter][position] + 1.) / (nb_hits + 4.) frequencies.append(freq) emissions.append(frequencies * 4) # Background transitions transitions = [[0.0, 1.0] + [0.0] * len(motif)] background_to_background = 1. - float(nb_seq) / nb_residues background_to_foreground = 1. - background_to_background # Core transitions transitions.append( [0., background_to_background, background_to_foreground] + [0.] * (len(motif) - 1)) for position in xrange(1, len(motif)): transitions.append( [0.] * (position + 2) + [1.] + [0.] * (len(motif) - position - 1)) # Final transitions now transitions.append([0., 1.] + [0.] * len(motif)) # Starting proba initials = [1.] + [0.] * (len(motif) + 1) return ghmm.HMMFromMatrices(ghmm.Alphabet(ALPHABET), ghmm.DiscreteDistribution( ghmm.Alphabet(ALPHABET)), transitions, emissions, initials)
[docs]def create_0order_hmm(nb_seq, nb_residues, first_letters, motif): """ Create a 0-order HMM initialized from MEME result :arg nb_seq: Number of sequences used by MEME :type nb_seq: int :arg nb_residues: Number of residues used by MEME :type nb_residues: int :arg first_letters: Number of occurrences of ACGT at the begining of sequences used by MEME :type first_letters: dic of str->int :arg motif: PFM as a Biopython motif to be used to initialize the TFFFM :type motif: :class:`Bio.motifs` :returns: The constructed HMM :rtype: :class:`ghmm.DiscreteEmissionHMM` """ # The first state is random emissions = [[0.25, 0.25, 0.25, 0.25]] # Complete the emissions with the actual motif frequencies if motif.instances: # The motif.counts is computed directly when creating the motif from # instances nb_hits = len(motif.instances) else: nb_hits = nb_seq for position in xrange(len(motif)): frequencies = [] for letter in "ACGT": freq = (motif.counts[letter][position] + 1.) / (nb_hits + 4.) frequencies.append(freq) emissions.append(frequencies) # Background transitions transitions = [] background_to_background = 1. - float(nb_seq) / nb_residues background_to_foreground = 1. - background_to_background transitions.append( [background_to_background, background_to_foreground] + [0.] * (len(motif) - 1)) # Core transitions for position in xrange(1, len(motif)): transitions.append( [0.] * (position + 1) + [1.] + [0.] * (len(motif) - position - 1)) # Final transitions now transitions.append([1.] + [0.] * len(motif)) # Starting proba initials = [1.] + [0.] * len(motif) return ghmm.HMMFromMatrices(ghmm.Alphabet(ALPHABET), ghmm.DiscreteDistribution( ghmm.Alphabet(ALPHABET)), transitions, emissions, initials)
[docs]def create_detailed_hmm(nb_seq, nb_residues, first_letters, motif): """ Create a detailed HMM initialized from MEME result :arg nb_seq: Number of sequences used by MEME :type nb_seq: int :arg nb_residues: Number of residues used by MEME :type nb_residues: int :arg first_letters: Number of occurrences of ACGT at the begining of sequences used by MEME :type first_letters: dic of str->int :arg motif: PFM as a Biopython motif to be used to initialize the TFFFM :type motif: :class:`Bio.motifs` :returns: The constructed HMM :rtype: :class:`ghmm.DiscreteEmissionHMM` """ # Starting proba with the starting nucleotides of the sequences and a '1' # pseudocount added initials = [(first_letters['A'] + 1.) / (nb_seq + 4.), (first_letters['C'] + 1.) / (nb_seq + 4.), (first_letters['G'] + 1.) / (nb_seq + 4.), (first_letters['T'] + 1.) / (nb_seq + 4.), 1. / nb_seq, 1. / nb_seq, 1. / nb_seq, 1. / nb_seq] initials += [0.] * 4 * (len(motif) - 1) # Emission proba emissions = [[1., 0., 0., 0.], [0., 1., 0., 0.], [0., 0., 1., 0.], [0., 0., 0., 1.]] * (len(motif) + 1) # Background transitions proba if motif.instances: # The motif.counts is computed directly when creating the motif from # instances nb_hits = len(motif.instances) else: nb_hits = nb_seq background_to_background = 1. - float(nb_seq) / nb_residues background_to_foreground = 1. - background_to_background background_to_background /= 4. transi = {} for letter in "ACGT": freq = (motif.counts[letter][0] + 1.) / (nb_hits + 4.) transi[letter] = freq * background_to_foreground transitions = [] for __ in xrange(4): transitions.append([background_to_background, background_to_background, background_to_background, background_to_background, transi['A'], transi['C'], transi['G'], transi['T']] + [0.] * 4 * (len(motif) - 1)) pfm = [(v + 1.) / (nb_hits + 4.) for letter in 'ACGT' for v in motif.counts[letter]] for position in xrange(len(motif) * 4): transitions.append([0.] * 4 * (len(motif) + 1)) for position in xrange(1, len(motif)): for line in xrange(4 * (position - 1) + 1, 4 * (position - 1) + 5): for column in xrange(4 * position + 1, 4 * position + 5): index = (column - (4 * position + 1)) * len(motif) + position transitions[line + 3][column + 3] = pfm[index] for index in xrange(4): state = len(motif) * 4 + index transitions[state][0] = 0.25 transitions[state][1] = 0.25 transitions[state][2] = 0.25 transitions[state][3] = 0.25 return ghmm.HMMFromMatrices(ghmm.Alphabet(ALPHABET), ghmm.DiscreteDistribution( ghmm.Alphabet(ALPHABET)), transitions, emissions, initials)
[docs]def tffm_from_meme(meme_output, kind, name="TFFM"): """ Construct a TFFM from the output of MEME on ChIP-seq data. :arg meme_output: File containing the output of MEME. :type meme_output: str :arg kind: Type of TFFM to construct between '1st-order' and 'detailed'. :type kind: str :arg name: Name of the TFFM (default: "TFFM") :type name: str :returns: The TFFM initialized from MEME results. :rtype: :class:`TFFM` :note: As the PFM is used to initialize the TFFM, a pseudocount of 1 is added to all the values in the PFM """ record = motifs.parse(open(meme_output), 'MEME') if record.alphabet != IUPAC.unambiguous_dna: sys.exit("### Wrong alphabet used in MEME ###") motif = record[0] nb_seq, nb_res, first_letters = utils.get_sequences_info(record.datafile) if kind == TFFM_KIND.FIRST_ORDER: hmm = create_1storder_hmm(nb_seq, nb_res, first_letters, motif) elif kind == TFFM_KIND.DETAILED: hmm = create_detailed_hmm(nb_seq, nb_res, first_letters, motif) else: # 0-order HMM here hmm = create_0order_hmm(nb_seq, nb_res, first_letters, motif) return TFFM(hmm.emissionDomain, hmm.distribution, hmm.cmodel, kind, name)
[docs]def tffm_from_motif(motif, kind, name="TFFM", nb_res=None, first_letters=None): """ Construct an initialized TFFM from a PFM. :arg motif: PFM as a Biopython motif to be used to initialize the TFFFM :type motif: :class:`Bio.motifs` :arg kind: Type of TFFM to construct between '1st-order' and 'detailed' :type kind: str :arg name: Name of the TFFM (default: "TFFM") :type name: str :arg nb_res: Number of residues to be used to compute the background->foreground transition probabilities (default: 0 meaning that we will assume a 100nt length for the ChIP-seq used to derive the PFM) :type nb_residues: int :arg first_letters: Number of occurrences of ACGT at the begining of sequences in the background (default: default values will give equiprobabilities for A, C, G, and T :type first_letters: dic of str->int :returns: The TFFM initialized from the PFM :rtype: :class:`TFFM` :see also: :func:`tffm_from_meme` :note: As the PFM is used to initialize the TFFM, a pseudocount of 1 is added to all the values in the PFM """ # extract the counts in the first column of the PFM first_counts = [count[0] for count in motif.counts.values()] nb_seq = reduce(lambda x, y: x + y, first_counts) if not first_letters: equi = nb_seq / 4 first_letters = {'A': equi, 'C': equi, 'G': equi, 'T': equi} if not nb_res: nb_res = nb_seq * 100 if kind == TFFM_KIND.FIRST_ORDER: hmm = create_1storder_hmm(nb_seq, nb_res, first_letters, motif) elif kind == TFFM_KIND.DETAILED: hmm = create_detailed_hmm(nb_seq, nb_res, first_letters, motif) else: # 0-order TFFM here hmm = create_0order_hmm(nb_seq, nb_res, first_letters, motif) return TFFM(hmm.emissionDomain, hmm.distribution, hmm.cmodel, kind, name)