#!/usr/bin/env python3 """ FOREST Thomas (thomas.forest@college-de-france.fr) Caution : At the moment for gzipped files only. ARGS -------- standalone usage : vcf_to_sfs.py VCF.gz nb_indiv """ import gzip import sys import numpy as np from frst import customgraphics import json import time import datetime import gc import pandas as pd def parse_vcf(vcf_file, phased=False, stop_at=None, chr_starts_with="*"): start = time.time() with gzip.open(vcf_file, "rb") as inputgz: byte_line = inputgz.readline() genotypes = [] noGenotype = [] pos = 0 pluriall_counts = 0 entries = {} chrom = {} nb_site = 0 print("Parsing VCF {} ... Please wait...".format(vcf_file)) #print("Parsing VCF", vcf_file, "... Please wait...") while byte_line: #print(line) # decode gzipped binary lines line = byte_line.decode('utf-8').strip() nb_site += 1 # # every snp line, not comment or header if not line.startswith("##") and not line.startswith("#"): FIELDS = line.split("\t") # when line is parsed, delete it to save some memory CHROM = FIELDS[0] POS = int(FIELDS[1]) if stop_at: if POS > stop_at: break # REF is col 4 of VCF REF = FIELDS[3].split(",") # ALT is col 5 of VCF ALT = FIELDS[4].split(",") FORMAT = FIELDS[8:9] SAMPLES = FIELDS[9:] QUALITY = float(FIELDS[5]) INFO = FIELDS[7] INFOS = {} for info in INFO.split(";"): try : INFOS[info.split('=')[0]] = info.split('=')[1] except: INFOS[info] = info GENOTYPE = [] LIKELIHOOD = [] # SKIP the SNP if : # 1 : missing # 2 : deletion among REF # 3 : deletion among ALT if "./.:." in SAMPLES \ or len(ALT[0]) > 1 \ or len(REF[0]) > 1: # sites that are not kept # if at least one missing data, remember it noGenotype.append(POS) byte_line = inputgz.readline() continue elif len(ALT) > 1: # pluriall sites # remember that the gestion of PL is very diff with pluriall pluriall_counts += 1 noGenotype.append(POS) byte_line = inputgz.readline() continue else: # for unphased and with only two fields : GT:PL for sample in SAMPLES: if not phased: # for UNPHASED data sample_genotype = [int(a) for a in sample.split(':')[0].split('/') if a != '.'] else: # for PHASED sample_genotype = [int(a) for a in sample.split(':')[0].split('|') if a != '.'] # list of PL : [prob of 0/0, prob of 0/1, prob of 1/1] sample_likelihood = sample.split(':')[1].split(',') sample_likelihood = [pow(10, -int(sample_likelihood[0])/10), pow(10, -int(sample_likelihood[1])/10), pow(10, -int(sample_likelihood[2])/10)] GENOTYPE += sample_genotype LIKELIHOOD.append(sample_likelihood) # from log phred score to probability of error, E #LIKELIHOOD = pow(10, -int(LIKELIHOOD[0])/10) #print(LIKELIHOOD) entries = { 'POS':POS, 'CHR':CHROM, # removed to save some space #'FIELDS':FIELDS, 'REF':REF, 'ALT':ALT, 'FORMAT':FORMAT, 'INFOS':INFOS, 'SAMPLES':SAMPLES, 'QUALITY':QUALITY, 'GENOTYPE':GENOTYPE, 'LIKELIHOOD':LIKELIHOOD, 'NB_PLURIALL':pluriall_counts } if CHROM.startswith(chr_starts_with): # keep if chr name starts with filter # default : *, every chr is kept if CHROM not in chrom: # resent when changing chrom pluriall_counts = 0 chrom[CHROM] = {} chrom[CHROM][POS] = entries byte_line = inputgz.readline() end = time.time() print("Parsed", nb_site, "sites in", str(datetime.timedelta(seconds=end - start))) return chrom def build_polymorph_coverage_matrix(entries, noGenotype, diploid=True, na_omit = False, normalize = False, xlim=None): """ Take infos out of parsing to build a coverage matrix of probability of error, E, at each position """ # last pos without any missing data ./.:. last_pos = list(entries.keys())[-1] # last genotyped SNP with missing data, usually bigger than last_pos last_genotyped = noGenotype[-1] mat = [] if diploid: # k=N if diploids, k = 2N if haploids k = int(len(entries[last_pos]['GENOTYPE'])/2) else: k = len(entries[last_pos]['GENOTYPE']) for _ in range(k): mat.append([]) #display_max = last_pos if xlim: display_max = xlim else: if na_omit: display_max = last_pos else: display_max = last_genotyped for pos in range(display_max): if pos in noGenotype or pos not in entries.keys(): if na_omit: continue else: for k in range(len(mat)): # if missing, prob=1, worst case mat[k].append(1) else: for k in range(len(mat)): best_prob = min(entries[pos]['LIKELIHOOD'][k]) #print(ind, best_prob) mat[k].append(best_prob) mat = np.array(mat) if normalize: row_sums = mat.sum(axis=1) mat = mat / row_sums[:, np.newaxis] return mat def genotyping_continuity_plot(vcf_entries, verbose=False, step = 1): genotyped_pos = sorted(list(vcf_entries.keys())) last_pos = genotyped_pos[-1] x = 0 y = 1 coords = [[], []] print("Chr. len. =", last_pos, "bp \t ; nb. SNPs =", len(genotyped_pos[::step])) for k, pos in enumerate(genotyped_pos[::step]): if verbose: progress = round(k/int(last_pos))*100 if progress % 10 == 0: print(progress, "%") y+=1*step x=pos coords[0].append(x) coords[1].append(y) return coords def compute_coverage(vcf_entries, verbose=False): last_pos = list(vcf_entries.keys())[-1] x = 0 y = 1 coords = [[], []] print(last_pos, "sites to scan") for entry in vcf_entries.values(): y = int(entry['INFOS']['DP']) x = entry['POS'] coords[0].append(x) coords[1].append(y) return coords def free(obj): """ Free the object and call the garbage collector explicitely """ del obj gc.collect() def random_indiv_from_vcfs(vcf_files_list): """ Either for two VCFs of two indiv or two samplenames from the same VCF. """ if len(vcf_files_list)>1: # two invid from two VCFs for vcf in vcf_files_list: with open(vcf) as vcf_stream: for line in vcf_stream: if line.startswith('#'): continue genotype = line.split("\t")[-1].strip() if genotype.split(":")[1] != '0': genotype_list = genotype.split(':')[0].split("/") print(genotype_list) else: # two samplename to randomly pick pass if __name__ == "__main__": # check args if len(sys.argv) !=2: print("Need 1 arg") exit(0) # main vcf_file = sys.argv[1] # # without missing data # entries, noGenotype = parse_vcf(vcf_file, stop_at = 20000) # mat = build_polymorph_coverage_matrix(entries, noGenotype, diploid=True, na_omit=True, normalize=False, xlim = None) # customgraphics.plot_matrix(mat, color_scale_type="autumn", cbarlabel = "prob. of error, E", title="Prob. of error (E) of genotyping, at each position, lower the better") # # with missing data # entries, noGenotype = parse_vcf(vcf_file, stop_at = 500) # mat = build_polymorph_coverage_matrix(entries, noGenotype, diploid=True, na_omit=False, normalize=False, xlim = None) # customgraphics.plot_matrix(mat, color_scale_type="autumn", cbarlabel = "prob. of error, E", title="Prob. of error (E) of genotyping, at each position, lower the better")