123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335 |
- """
- Small assembly module based on de bruijn graphs
- """
- import os
- import sys
- import statistics
- import networkx as nx
- from networkx import algorithms
-
- def read_fastq(fichier):
- """
- Returns an iterator object that retrieves only the nucleic sequences of a
- fastq file.
- Arguments:
- fichier, str: path to fastq file
-
- Returns:
- a str generator, generator of sequences
- """
- with open(fichier, 'r') as filin:
- for line in filin:
- yield filin.readline().strip()
- filin.readline()
- filin.readline()
-
- def cut_kmer(seq, k):
- """
- Returns an iterator that returns k-mers of k-size of a sequence
- Arguments:
- seq, str: a sequence
- k, int: k-mer size, must be shorter than len(seq)
-
- Returns:
- an iterator returning str
- """
- for i in range(len(seq)-(k-1)):
- yield seq[i:i+k]
-
- def build_kmer_dict(fichier, k):
- """
- Arguments:
- fichier, str: path to fastq file
- k, int: k-mer size, must be shorter than len(seq)
-
- Return:
- hash_table, dict: dictionnary with key = k-mer as str
- and value count of k-mer occurence
- """
- hash_table = {}# initialise empty hash table
- it_fastq = read_fastq(fichier)
- for seq in it_fastq: # for each sequence
- it_kmer = cut_kmer(seq, k) # count each occurence of k-mer
- for kmer in it_kmer:
- try:
- hash_table[kmer]
- except KeyError:
- hash_table[kmer] = 1
- else:
- hash_table[kmer] += 1
-
- return hash_table
-
- def build_graph(hash_table):
- """
- Returns a graph from a hash table
-
- Arguments:
- hash_table, dict: dictionnary obtained with build_kmer_dict() function
- Return:
- graph, nx.DiGraph: the de Bruijn tree corresponding to hash_table
- """
- graph = nx.DiGraph()
- for key in hash_table:
- graph.add_edge(key[:-1], key[1:], weight=hash_table[key])
-
- return graph
-
- def get_starting_nodes(graph):
- """
- Returns the list of starting nodes of a graph
-
- Arguments:
- graph, nx.DiGraph: de Bruijn tree
-
- Return:
- starting_nodes, list of strings: list of starting nodes
- """
- starting_nodes = []
- for node in graph:
- if graph.in_degree(node) == 0: # if count of input edge == 0
- starting_nodes.append(node)
-
- return starting_nodes
-
- def std(values):
- """
- Computes standard deviation from a list of value
- Arguments:
- values, list: list of values
-
- Returns :
- standard deviation of the 'values' data list
- """
- return statistics.stdev(float(values))
-
-
-
- def get_sink_nodes(graph):
- """
-
- Arguments:
- graph, nx.DiGraph: de Bruijn tree
-
- Return:
- sink_nodes, list of strings: list of terminal nodes
- """
- sink_nodes = []
- for node in graph:
- if graph.out_degree(node) == 0: # if count of output edge == 0
- sink_nodes.append(node)
-
- return sink_nodes
-
-
- def path_average_weight(graph, path):
- """
- Arguments:
- graph, nx.DiGraph: a de bruijn graph
- path, list of str(nodes): list of nodes constituing a path
-
- Return:
- mean weight, float: the mean weight of the path
- """
- weight = 0
- for i in range(len(path)-1):
- weight += graph.edges[path[i], path[i+1]]["weight"]
-
- weight = weight/(len(path) - 1)
-
- return weight
-
- def remove_paths(graph, paths, delete_entry_node=False, delete_sink_node=False):
- """
- Removes paths in 'paths' list from 'graph' object. Don't delete entry and
- sink nodes unless specified.
-
- Arguments:
- graph, nd.DiGraph(): de bruijn graph
- paths, list of string lists: list of paths composed of nodes
- delete_entry_node, boolean: Delete entry if True
- delete_sink_node, boolean: Delete sink node if True
-
- Return:
- graph, nd.DiGraph(): de bruijn graph with deletes paths.
- """
- entry = 1
- sink = -1
- if delete_entry_node == True:
- entry = 0
- if delete_sink_node == True:
- sink = None
- for path in paths:
- graph.remove_nodes_from(path[entry:sink])
-
- return graph
-
-
- def select_best_path(graph, paths, path_lens, mean_weights,
- delete_entry_node=False, delete_sink_node=False):
- """
- Given path list, their length and weight, keeps only the best path among
- them considering the following priority : weight, length and randomly
-
- Arguments:
- graph, nx.DiGraph: a de bruijn graph
- paths, list of str: list of paths
- path_lens: lengths of the paths
- mean_weights: mean weights of the paths
- delete_entry_node, boolean: either or not if the entry node
- should be deleted
- delete_sink_node, boolean: either or not if the sink node
- should be deleted
-
- Returns:
- graph, nx.DiGraph: graph with deleted paths
- """
- max_weight = max(mean_weights)
- heaviest = [i for i, j in enumerate(mean_weights) if j == max_weight]
- if len(heaviest) > 1:
- max_len = max(path_lens)
- longest = [i for i in heaviest if path_lens[i] == max_len]
- if len(longest) > 1:
- Random.seed(9001)
- best = random.choice[longest]
- else:
- best = longest[0]
- else:
- best = heaviest[0]
-
- for p in paths:
- print(p)
-
- paths2 = [p for p in paths]
- paths2.pop(best)
-
- return remove_paths(graph, paths2, delete_entry_node, delete_sink_node)
-
-
- def fill(text, width=80):
- """Split text with a line return to respect fasta format"""
- return os.linesep.join(text[i:i+width] for i in range(0, len(text), width))
-
- def save_contigs(tuples, outname):
- """
- Arguments:
- tuples, tuple: Obtained from get_contigs()
- outname, str: name of the file to be written
- """
- i = 0
- with open(outname, "w") as outfile:
- for duo in tuples:
- i += 1
- outfile.write(">contig_{} len={}\n".format(i, duo[1]))
- outfile.write("{}\n".format(fill(duo[0])))
- return
-
-
- def get_contigs(graph, starting_nodes, sink_nodes):
- """
- Arguments:
- graph, nx.DiGraph: de Bruijn tree
- starting_nodes, list of strings: list of starting nodes
- sink_nodes, list of strings: list of terminal nodes
-
- Return:
- contigs, list of tupple: list of tupple (contigs, len(contigs))
- """
- contigs = []
- for starting_node in starting_nodes:
- for sink_node in sink_nodes:
- if algorithms.has_path(graph, starting_node, sink_node) == True:
- path = algorithms.shortest_path(graph, starting_node, sink_node)
- contig = path[0] # base of the contig is seq of the first node
- for i in range(len(path)-1):
- contig += path[i+1][-1] # adds last char of node
- contigs.append((contig, len(contig)))
-
- return contigs
-
- def solve_bubble(graph, ancestor_node, descendent_node):
- """
- solve a bubble
-
- Arguments:
- graph, nx.DiGraph: a de bruijn graph
- ancestor_node, str: a node
- descendent_node, str: a node
-
- Returns:
- graph, nx.DiGraph: the same graph without the bubble
- """
- paths = algorithms.all_simple_paths(graph, ancestor_node, descendent_node)
- paths2= []
- for p in paths:
- paths2.append(p)
-
- paths = paths2
-
- weights = []
- path_lens = []
- for path in paths:# constituting weights and length lists
- weights.append(path_average_weight(graph, path))
- path_lens.append(len(path))
-
- return select_best_path(graph, paths, weights, path_lens) # keep best path
-
-
- def simplify_bubbles(graph):
- """
- Returns a bubble-less graph
-
- Arguments:
- graph, nx.DiGraph: a de bruijn graph
-
- Returns:
- graph, nx.DiGraph: a bubble-less de bruijn graph
- """
- fork_nodes = []# empty list containing nodes with 2 or more ancestors
- f=1
- while f == 1:
- for node in graph:
- if graph.in_degree(node) >= 2: # if 2 or more ancestor add node
- pred = [n for n in graph.predecessors(node)]
- if len(pred) > 2:
- f = 1
- else:
- f = 0
- ancestor = algorithms.lowest_common_ancestor(graph,pred[0], pred[1])
- graph = solve_bubble(graph, ancestor, node)
-
- return graph
-
- def solve_entry_tips():
- pass
-
-
- def solve_out_tips():
- pass
-
- def main():
- fichier = str(sys.argv[1])
- k = int(sys.argv[2])
- hash_table = build_kmer_dict(fichier, k)
- G = build_graph(hash_table)
-
- nb = 0
- for n in G:
- if G.in_degree(n) > 1:
- nb += 1
-
- print("il y a {} bulles dans le graphe\n".format(nb))
-
- simplify_bubbles(G)
-
- nb = 0
- for n in G:
- if G.in_degree(n) > 1:
- nb += 1
-
- print("il y a {} bulles dans le graphe\n".format(nb))
-
-
- if __name__ == "__main__":
- main()
|