""" Small assembly module based on de bruijn graphs """ import os import sys import statistics import networkx as nx from networkx import algorithms def read_fastq(fichier): """ Returns an iterator object that retrieves only the nucleic sequences of a fastq file. Arguments: fichier, str: path to fastq file Returns: a str generator, generator of sequences """ with open(fichier, 'r') as filin: for line in filin: yield filin.readline().strip() filin.readline() filin.readline() def cut_kmer(seq, k): """ Returns an iterator that returns k-mers of k-size of a sequence Arguments: seq, str: a sequence k, int: k-mer size, must be shorter than len(seq) Returns: an iterator returning str """ for i in range(len(seq)-(k-1)): yield seq[i:i+k] def build_kmer_dict(fichier, k): """ Arguments: fichier, str: path to fastq file k, int: k-mer size, must be shorter than len(seq) Return: hash_table, dict: dictionnary with key = k-mer as str and value count of k-mer occurence """ hash_table = {}# initialise empty hash table it_fastq = read_fastq(fichier) for seq in it_fastq: # for each sequence it_kmer = cut_kmer(seq, k) # count each occurence of k-mer for kmer in it_kmer: try: hash_table[kmer] except KeyError: hash_table[kmer] = 1 else: hash_table[kmer] += 1 return hash_table def build_graph(hash_table): """ Returns a graph from a hash table Arguments: hash_table, dict: dictionnary obtained with build_kmer_dict() function Return: graph, nx.DiGraph: the de Bruijn tree corresponding to hash_table """ graph = nx.DiGraph() for key in hash_table: graph.add_edge(key[:-1], key[1:], weight=hash_table[key]) return graph def get_starting_nodes(graph): """ Returns the list of starting nodes of a graph Arguments: graph, nx.DiGraph: de Bruijn tree Return: starting_nodes, list of strings: list of starting nodes """ starting_nodes = [] for node in graph: if graph.in_degree(node) == 0: # if count of input edge == 0 starting_nodes.append(node) return starting_nodes def std(values): """ Computes standard deviation from a list of value Arguments: values, list: list of values Returns : standard deviation of the 'values' data list """ return statistics.stdev(float(values)) def get_sink_nodes(graph): """ Arguments: graph, nx.DiGraph: de Bruijn tree Return: sink_nodes, list of strings: list of terminal nodes """ sink_nodes = [] for node in graph: if graph.out_degree(node) == 0: # if count of output edge == 0 sink_nodes.append(node) return sink_nodes def path_average_weight(graph, path): """ Arguments: graph, nx.DiGraph: a de bruijn graph path, list of str(nodes): list of nodes constituing a path Return: mean weight, float: the mean weight of the path """ weight = 0 for i in range(len(path)-1): weight += graph.edges[path[i], path[i+1]]["weight"] weight = weight/(len(path) - 1) return weight def remove_paths(graph, paths, delete_entry_node=False, delete_sink_node=False): """ Removes paths in 'paths' list from 'graph' object. Don't delete entry and sink nodes unless specified. Arguments: graph, nd.DiGraph(): de bruijn graph paths, list of string lists: list of paths composed of nodes delete_entry_node, boolean: Delete entry if True delete_sink_node, boolean: Delete sink node if True Return: graph, nd.DiGraph(): de bruijn graph with deletes paths. """ entry = 1 sink = -1 if delete_entry_node == True: entry = 0 if delete_sink_node == True: sink = None for path in paths: graph.remove_nodes_from(path[entry:sink]) return graph def select_best_path(graph, paths, path_lens, mean_weights, delete_entry_node=False, delete_sink_node=False): """ Given path list, their length and weight, keeps only the best path among them considering the following priority : weight, length and randomly Arguments: graph, nx.DiGraph: a de bruijn graph paths, list of str: list of paths path_lens: lengths of the paths mean_weights: mean weights of the paths delete_entry_node, boolean: either or not if the entry node should be deleted delete_sink_node, boolean: either or not if the sink node should be deleted Returns: graph, nx.DiGraph: graph with deleted paths """ max_weight = max(mean_weights) heaviest = [i for i, j in enumerate(mean_weights) if j == max_weight] if len(heaviest) > 1: max_len = max(path_lens) longest = [i for i in heaviest if path_lens[i] == max_len] if len(longest) > 1: Random.seed(9001) best = random.choice[longest] else: best = longest[0] else: best = heaviest[0] for p in paths: print(p) paths2 = [p for p in paths] paths2.pop(best) return remove_paths(graph, paths2, delete_entry_node, delete_sink_node) def fill(text, width=80): """Split text with a line return to respect fasta format""" return os.linesep.join(text[i:i+width] for i in range(0, len(text), width)) def save_contigs(tuples, outname): """ Arguments: tuples, tuple: Obtained from get_contigs() outname, str: name of the file to be written """ i = 0 with open(outname, "w") as outfile: for duo in tuples: i += 1 outfile.write(">contig_{} len={}\n".format(i, duo[1])) outfile.write("{}\n".format(fill(duo[0]))) return def get_contigs(graph, starting_nodes, sink_nodes): """ Arguments: graph, nx.DiGraph: de Bruijn tree starting_nodes, list of strings: list of starting nodes sink_nodes, list of strings: list of terminal nodes Return: contigs, list of tupple: list of tupple (contigs, len(contigs)) """ contigs = [] for starting_node in starting_nodes: for sink_node in sink_nodes: if algorithms.has_path(graph, starting_node, sink_node) == True: path = algorithms.shortest_path(graph, starting_node, sink_node) contig = path[0] # base of the contig is seq of the first node for i in range(len(path)-1): contig += path[i+1][-1] # adds last char of node contigs.append((contig, len(contig))) return contigs def solve_bubble(graph, ancestor_node, descendent_node): """ solve a bubble Arguments: graph, nx.DiGraph: a de bruijn graph ancestor_node, str: a node descendent_node, str: a node Returns: graph, nx.DiGraph: the same graph without the bubble """ paths = algorithms.all_simple_paths(graph, ancestor_node, descendent_node) paths2= [] for p in paths: paths2.append(p) paths = paths2 weights = [] path_lens = [] for path in paths:# constituting weights and length lists weights.append(path_average_weight(graph, path)) path_lens.append(len(path)) return select_best_path(graph, paths, weights, path_lens) # keep best path def simplify_bubbles(graph): """ Returns a bubble-less graph Arguments: graph, nx.DiGraph: a de bruijn graph Returns: graph, nx.DiGraph: a bubble-less de bruijn graph """ fork_nodes = []# empty list containing nodes with 2 or more ancestors f=1 while f == 1: for node in graph: if graph.in_degree(node) >= 2: # if 2 or more ancestor add node pred = [n for n in graph.predecessors(node)] if len(pred) > 2: f = 1 else: f = 0 ancestor = algorithms.lowest_common_ancestor(graph,pred[0], pred[1]) graph = solve_bubble(graph, ancestor, node) return graph def solve_entry_tips(): pass def solve_out_tips(): pass def main(): fichier = str(sys.argv[1]) k = int(sys.argv[2]) hash_table = build_kmer_dict(fichier, k) G = build_graph(hash_table) nb = 0 for n in G: if G.in_degree(n) > 1: nb += 1 print("il y a {} bulles dans le graphe\n".format(nb)) simplify_bubbles(G) nb = 0 for n in G: if G.in_degree(n) > 1: nb += 1 print("il y a {} bulles dans le graphe\n".format(nb)) if __name__ == "__main__": main()