123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- """
- Small assembly module based on de bruijn graphs
- """
- import networkx as nx
- from networkx import algorithms
-
- def read_fastq(fichier):
- """
- Arguments:
- fichier, str: path to fastq file
-
- Returns:
- a str generator, generator of sequences
- """
- with open(fichier, 'r') as filin:
- for line in filin:
- yield filin.readline().strip()
- filin.readline()
- filin.readline()
-
- def cut_kmer(seq, k):
- """
- Arguments:
- seq, str: a sequence
- k, int: k-mer size, must be shorter than len(seq)
-
- Returns:
- an iterator returning str
- """
- for i in range(len(seq)-(k-1)):
- yield seq[i:i+k]
-
- def build_kmer_dict(fichier, k):
- """
- Arguments:
- fichier, str: path to fastq file
- k, int: k-mer size, must be shorter than len(seq)
-
- Return:
- hash_table, dict: dictionnary with key = k-mer as str
- and value count of k-mer occurence
- """
- hash_table = {}
- it_fastq = read_fastq(fichier)
- for seq in it_fastq:
- it_kmer = cut_kmer(seq, k)
- for kmer in it_kmer:
- try:
- hash_table[kmer]
- except KeyError:
- hash_table[kmer] = 1
- else:
- hash_table[kmer] += 1
-
- return hash_table
-
- def build_graph(hash_table):
- """
- Arguments:
- hash_table, dict: dictionnary obtained with build_kmer_dict() function
- Return:
- graph, nx.DiGraph: the de Bruijn tree corresponding to hash_table
- """
- graph = nx.DiGraph()
- for key in hash_table:
- graph.add_edge(key[:-1], key[1:], weight=hash_table[key])
-
- return graph
-
- def get_starting_nodes(graph):
- """
- Arguments:
- graph, nx.DiGraph: de Bruijn tree
-
- Return:
- starting_nodes, list of strings: list of starting nodes
- """
- starting_nodes = []
- for node in graph:
- if graph.in_degree(node) == 0:
- starting_nodes.append(node)
-
- return starting_nodes
-
- def std():
- pass
-
-
- def get_sink_nodes(graph):
- """
- Arguments:
- graph, nx.DiGraph: de Bruijn tree
-
- Return:
- sink_nodes, list of strings: list of terminal nodes
- """
- sink_nodes = []
- for node in graph:
- if graph.out_degree(node) == 0:
- sink_nodes.append(node)
-
- return sink_nodes
-
-
- def path_average_weight():
- pass
-
-
- def remove_paths():
- pass
-
-
- def select_best_path():
- pass
-
-
- def save_contigs(tuples, outname):
- """
- Arguments:
- tuples, tuple: Obtained from get_contigs()
- outname, str: name of the file to be written
- """
- with open(outname, "w") as outfile:
- for duo in tuples:
- outfile.write("{} {}".format(duo[0], duo[1]))
-
- return
-
-
- def get_contigs(graph, starting_nodes, sink_nodes):
- """
- Arguments:
- graph, nx.DiGraph: de Bruijn tree
- starting_nodes, list of strings: list of starting nodes
- sink_nodes, list of strings: list of terminal nodes
-
- Return:
- contigs, list of tupple: list of tupple (contigs, len(contigs))
- """
- contigs = []
- for starting_node in starting_nodes:
- for sink_node in sink_nodes:
- if algorithms.has_path(graph, starting_node, sink_node) == True:
- path = algorithms.shortest_path(graph, starting_node, sink_node)
- contig = path[0]
- for i in range(len(path)-1):
- contig += path[i+1][-1]
- contigs.append((contig, len(contig)))
-
- return contigs
-
- def solve_bubble():
- pass
-
-
- def simplify_bubbles():
- pass
-
-
- def solve_entry_tips():
- pass
-
-
- def solve_out_tips():
- pass
|