123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253 |
- #!/usr/bin/env python3
-
- """
- FOREST Thomas (thomas.forest@college-de-france.fr)
-
- Caution : At the moment for gzipped files only.
-
- ARGS
- --------
-
- standalone usage : vcf_to_sfs.py VCF.gz nb_indiv
-
- """
- import gzip
- import sys
- import numpy as np
- from frst import customgraphics
- import json
- import time
- import datetime
- import gc
- import pandas as pd
-
-
- def parse_vcf(vcf_file, phased=False, stop_at=None, chr_starts_with="*"):
- start = time.time()
- with gzip.open(vcf_file, "rb") as inputgz:
- byte_line = inputgz.readline()
- genotypes = []
- noGenotype = []
- pos = 0
- pluriall_counts = 0
- entries = {}
- chrom = {}
- nb_site = 0
- print("Parsing VCF {} ... Please wait...".format(vcf_file))
- #print("Parsing VCF", vcf_file, "... Please wait...")
- while byte_line:
- #print(line)
- # decode gzipped binary lines
- line = byte_line.decode('utf-8').strip()
- nb_site += 1
- # # every snp line, not comment or header
- if not line.startswith("##") and not line.startswith("#"):
- FIELDS = line.split("\t")
- # when line is parsed, delete it to save some memory
- CHROM = FIELDS[0]
- POS = int(FIELDS[1])
- if stop_at:
- if POS > stop_at:
- break
- # REF is col 4 of VCF
- REF = FIELDS[3].split(",")
- # ALT is col 5 of VCF
- ALT = FIELDS[4].split(",")
- FORMAT = FIELDS[8:9]
- SAMPLES = FIELDS[9:]
- QUALITY = float(FIELDS[5])
- INFO = FIELDS[7]
- INFOS = {}
- for info in INFO.split(";"):
- try :
- INFOS[info.split('=')[0]] = info.split('=')[1]
- except:
- INFOS[info] = info
- GENOTYPE = []
- LIKELIHOOD = []
- # SKIP the SNP if :
- # 1 : missing
- # 2 : deletion among REF
- # 3 : deletion among ALT
- if "./.:." in SAMPLES \
- or len(ALT[0]) > 1 \
- or len(REF[0]) > 1:
- # sites that are not kept
- # if at least one missing data, remember it
- noGenotype.append(POS)
- byte_line = inputgz.readline()
- continue
- elif len(ALT) > 1:
- # pluriall sites
- # remember that the gestion of PL is very diff with pluriall
- pluriall_counts += 1
- noGenotype.append(POS)
- byte_line = inputgz.readline()
- continue
- else:
- # for unphased and with only two fields : GT:PL
- for sample in SAMPLES:
- if not phased:
- # for UNPHASED data
- sample_genotype = [int(a) for a in sample.split(':')[0].split('/') if a != '.']
- else:
- # for PHASED
- sample_genotype = [int(a) for a in sample.split(':')[0].split('|') if a != '.']
- # list of PL : [prob of 0/0, prob of 0/1, prob of 1/1]
- sample_likelihood = sample.split(':')[1].split(',')
- sample_likelihood = [pow(10, -int(sample_likelihood[0])/10),
- pow(10, -int(sample_likelihood[1])/10), pow(10, -int(sample_likelihood[2])/10)]
- GENOTYPE += sample_genotype
- LIKELIHOOD.append(sample_likelihood)
- # from log phred score to probability of error, E
- #LIKELIHOOD = pow(10, -int(LIKELIHOOD[0])/10)
- #print(LIKELIHOOD)
- entries = {
- 'POS':POS,
- 'CHR':CHROM,
- # removed to save some space
- #'FIELDS':FIELDS,
- 'REF':REF,
- 'ALT':ALT,
- 'FORMAT':FORMAT,
- 'INFOS':INFOS,
- 'SAMPLES':SAMPLES,
- 'QUALITY':QUALITY,
- 'GENOTYPE':GENOTYPE,
- 'LIKELIHOOD':LIKELIHOOD,
- 'NB_PLURIALL':pluriall_counts
- }
- if CHROM.startswith(chr_starts_with):
- # keep if chr name starts with filter
- # default : *, every chr is kept
- if CHROM not in chrom:
- # resent when changing chrom
- pluriall_counts = 0
- chrom[CHROM] = {}
- chrom[CHROM][POS] = entries
- byte_line = inputgz.readline()
- end = time.time()
- print("Parsed", nb_site, "sites in", str(datetime.timedelta(seconds=end - start)))
-
- return chrom
-
- def build_polymorph_coverage_matrix(entries, noGenotype, diploid=True, na_omit = False, normalize = False, xlim=None):
- """ Take infos out of parsing to build a coverage matrix of probability
- of error, E, at each position
- """
- # last pos without any missing data ./.:.
- last_pos = list(entries.keys())[-1]
- # last genotyped SNP with missing data, usually bigger than last_pos
- last_genotyped = noGenotype[-1]
- mat = []
- if diploid:
- # k=N if diploids, k = 2N if haploids
- k = int(len(entries[last_pos]['GENOTYPE'])/2)
- else:
- k = len(entries[last_pos]['GENOTYPE'])
- for _ in range(k):
- mat.append([])
- #display_max = last_pos
- if xlim:
- display_max = xlim
- else:
- if na_omit:
- display_max = last_pos
- else:
- display_max = last_genotyped
- for pos in range(display_max):
- if pos in noGenotype or pos not in entries.keys():
- if na_omit:
- continue
- else:
- for k in range(len(mat)):
- # if missing, prob=1, worst case
- mat[k].append(1)
- else:
- for k in range(len(mat)):
- best_prob = min(entries[pos]['LIKELIHOOD'][k])
- #print(ind, best_prob)
- mat[k].append(best_prob)
-
- mat = np.array(mat)
- if normalize:
- row_sums = mat.sum(axis=1)
- mat = mat / row_sums[:, np.newaxis]
- return mat
-
- def genotyping_continuity_plot(vcf_entries,
- verbose=False,
- step = 1):
- genotyped_pos = sorted(list(vcf_entries.keys()))
- last_pos = genotyped_pos[-1]
- x = 0
- y = 1
- coords = [[], []]
- print("Chr. len. =", last_pos, "bp \t ; nb. SNPs =", len(genotyped_pos[::step]))
- for k, pos in enumerate(genotyped_pos[::step]):
- if verbose:
- progress = round(k/int(last_pos))*100
- if progress % 10 == 0:
- print(progress, "%")
- y+=1*step
- x=pos
- coords[0].append(x)
- coords[1].append(y)
- return coords
-
- def compute_coverage(vcf_entries, verbose=False):
- last_pos = list(vcf_entries.keys())[-1]
- x = 0
- y = 1
- coords = [[], []]
- print(last_pos, "sites to scan")
- for entry in vcf_entries.values():
- y = int(entry['INFOS']['DP'])
- x = entry['POS']
- coords[0].append(x)
- coords[1].append(y)
- return coords
-
- def free(obj):
- """ Free the object and call the garbage collector explicitely
- """
- del obj
- gc.collect()
-
- def random_indiv_from_vcfs(vcf_files_list):
- """
- Either for two VCFs of two indiv or two samplenames from the same VCF.
- """
- if len(vcf_files_list)>1:
- # two invid from two VCFs
- for vcf in vcf_files_list:
- with open(vcf) as vcf_stream:
- for line in vcf_stream:
- if line.startswith('#'):
- continue
- genotype = line.split("\t")[-1].strip()
- if genotype.split(":")[1] != '0':
- genotype_list = genotype.split(':')[0].split("/")
- print(genotype_list)
- else:
- # two samplename to randomly pick
- pass
-
- if __name__ == "__main__":
- # check args
- if len(sys.argv) !=2:
- print("Need 1 arg")
- exit(0)
- # main
- vcf_file = sys.argv[1]
-
- # # without missing data
- # entries, noGenotype = parse_vcf(vcf_file, stop_at = 20000)
- # mat = build_polymorph_coverage_matrix(entries, noGenotype, diploid=True, na_omit=True, normalize=False, xlim = None)
- # customgraphics.plot_matrix(mat, color_scale_type="autumn", cbarlabel = "prob. of error, E", title="Prob. of error (E) of genotyping, at each position, lower the better")
-
- # # with missing data
- # entries, noGenotype = parse_vcf(vcf_file, stop_at = 500)
- # mat = build_polymorph_coverage_matrix(entries, noGenotype, diploid=True, na_omit=False, normalize=False, xlim = None)
- # customgraphics.plot_matrix(mat, color_scale_type="autumn", cbarlabel = "prob. of error, E", title="Prob. of error (E) of genotyping, at each position, lower the better")
|