''' Awesome people who have directly contributed to the project: Jon Palmer - Bug finder & advice on project direction Mahmut Uludag - Bug finder Help: print pybam.wat Github: http://github.com/JohnLonginotto/pybam This code was written by John Longinotto, a PhD student of the Pospisilik Lab at the Max Planck Institute of Immunbiology & Epigenetics, Freiburg. My PhD is funded by the Deutsches Epigenom Programm (DEEP), and the Max Planck IMPRS Program. I study Adipose Biology and Circadian Rhythm in mice, although it seems these days I spend most of my time at the computer :-) Ported to Python3 by Thomas Forest (thomas.forest@college-de-france.fr) ''' import os import sys import zlib import time import tempfile import subprocess from array import array from struct import unpack CtoPy = { 'A':'"' } wat = ''' Main class: pybam.read Github: http://github.com/JohnLonginotto/pybam [ Dynamic Parser Example ] for alignment in pybam.read('/my/data.bam'): print alignment.sam_seq [ Static Parser Example ] for seq,mapq in pybam.read('/my/data.bam',['sam_seq','sam_mapq']): print seq print mapq [ Mixed Parser Example ] my_bam = pybam.read('/my/data.bam',['sam_seq','sam_mapq']) print my_bam._static_parser_code for seq,mapq in my_bam: if seq.startswith('ACGT') and mapq > 10: print my_bam.sam [ Custom Decompressor (from file path) Example ] my_bam = pybam.read('/my/data.bam.lzma',decompressor='lzma --decompress --stdout /my/data.bam.lzma') [ Custom Decompressor (from file object) Example ] my_bam = pybam.read(sys.stdin,decompressor='lzma --decompress --stdout') # data given to lzma via stdin [ Force Internal bgzip Decompressor ] my_bam = pybam.read('/my/data.bam',decompressor='internal') [ Parse Words (hah) ]''' wat += '\n'+''.join([('\n===============================================================================================\n\n ' if code == 'file_alignments_read' or code == 'sam' else ' ')+(code+' ').ljust(25,'-')+description+'\n' for code,description in sorted(parse_codes.items())]) + '\n' class read: ''' [ Dynamic Parser Example ] for alignment in pybam.read('/my/data.bam'): print alignment.sam_seq [ Static Parser Example ] for seq,mapq in pybam.read('/my/data.bam',['sam_seq','sam_mapq']): print seq print mapq [ Mixed Parser Example ] my_bam = pybam.read('/my/data.bam',['sam_seq','sam_mapq']) print my_bam._static_parser_code for seq,mapq in my_bam: if seq.startswith('ACGT') and mapq > 10: print my_bam.sam [ Custom Decompressor (from file path) Example ] my_bam = pybam.read('/my/data.bam.lzma',decompressor='lzma --decompress --stdout /my/data.bam.lzma') [ Custom Decompressor (from file object) Example ] my_bam = pybam.read(sys.stdin,decompressor='lzma --decompress --stdout') # data given to lzma via stdin [ Force Internal bgzip Decompressor ] my_bam = pybam.read('/my/data.bam',decompressor='internal') "print pybam.wat" in the python terminal to see the possible parsable values, or v==it http://github.com/JohnLonginotto/pybam for the latest info. ''' def __init__(self,f,fields=False,decompressor=False): self.file_bytes_read = 0 self.file_chromosomes = [] self.file_alignments_read = 0 self.file_chromosome_lengths = {} if fields != False: if type(fields) != list or len(fields) == 0: raise PybamError('\n\nFields for the static parser must be provided as a non-empty list. You gave a ' + str(type(fields)) + '\n') else: for field in fields: if field.startswith('sam') or field.startswith('bam'): if field not in parse_codes.keys(): raise PybamError('\n\nStatic parser field "' + str(field) + '" from fields ' + str(fields) + ' is not known to this version of pybam!\nPrint "pybam.wat" to see available field names with explinations.\n') else: raise PybamError('\n\nStatic parser field "' + str(field) + '" from fields ' + str(fields) + ' does not start with "sam" or "bam" and thus is an avaliable field for the static parsing.\nPrint "pybam.wat" in interactive python to see available field names with explinations.\n') if decompressor: if type(decompressor) == str: if decompressor != 'internal' and '{}' not in decompressor: raise PybamError('\n\nWhen a custom decompressor is used and the input file is a string, the decompressor string must contain at least one occurence of "{}" to be substituted with a filepath by pybam.\n') else: raise PybamError('\n\nUser-supplied decompressor must be a string that when run on the command line decompresses a named file (or stdin), to stdout:\ne.g. "lzma --decompress --stdout {}" if pybam is provided a path as input file, where {} is substituted for that path.\nor just "lzma --decompress --stdout" if pybam is provided a file object instead of a file path, as data from that file object will be piped via stdin to the decompression program.\n') ## First we make a generator that will return chunks of uncompressed data, regardless of how we choose to decompress: def generator(): DEVNULL = open(os.devnull, 'wb') # First we need to figure out what sort of file we have - whether it's gzip compressed, uncompressed, or something else entirely! if type(f) == str: try: self._file = open(f,'rb') except: raise PybamError('\n\nCould not open "' + str(self._file.name) + '" for reading!\n') try: magic = os.read(self._file.fileno(),4) except: raise PybamError('\n\nCould not read from "' + str(self._file.name) + '"!\n') elif type(f) == file: self._file = f try: magic = os.read(self._file.fileno(),4) except: raise PybamError('\n\nCould not read from "' + str(self._file.name) + '"!\n') else: raise PybamError('\n\nInput file was not a string or a file object. It was: "' + str(f) + '"\n') self.file_name = os.path.basename(os.path.realpath(self._file.name)) self.file_directory = os.path.dirname(os.path.realpath(self._file.name)) if magic == 'BAM\1': # The user has passed us already unzipped BAM data! Job done :) data = 'BAM\1' + self._file.read(35536) self.file_bytes_read += len(data) self.file_decompressor = 'None' while data: yield data data = self._file.read(35536) self.file_bytes_read += len(data) self._file.close() DEVNULL.close() raise StopIteration elif magic == "\x1f\x8b\x08\x04": # The user has passed us compressed gzip/bgzip data, which is typical for a BAM file # use custom decompressor if provided: if decompressor != False and decompressor != 'internal': if type(f) == str: self._subprocess = subprocess.Popen( decompressor.replace('{}',f), shell=True, stdout=subprocess.PIPE, stderr=DEVNULL) else: self._subprocess = subprocess.Popen('{ printf "'+magic+'"; cat; } | ' + decompressor, stdin=self._file, shell=True, stdout=subprocess.PIPE, stderr=DEVNULL) self.file_decompressor = decompressor data = self._subprocess.stdout.read(35536) self.file_bytes_read += len(data) while data: yield data data = self._subprocess.stdout.read(35536) self.file_bytes_read += len(data) self._file.close() DEVNULL.close() raise StopIteration # else look for pigz or gzip: else: try: self._subprocess = subprocess.Popen(["pigz"],stdin=DEVNULL,stdout=DEVNULL,stderr=DEVNULL) if self._subprocess.returncode == None: self._subprocess.kill() use = 'pigz' except OSError: try: self._subprocess = subprocess.Popen(["gzip"],stdin=DEVNULL,stdout=DEVNULL,stderr=DEVNULL) if self._subprocess.returncode == None: self._subprocess.kill() use = 'gzip' except OSError: use = 'internal' if use != 'internal' and decompressor != 'internal': if type(f) == str: self._subprocess = subprocess.Popen([ use , '--decompress','--stdout', f ], stdout=subprocess.PIPE, stderr=DEVNULL) else: self._subprocess = subprocess.Popen('{ printf "'+magic+'"; cat; } | ' + use + ' --decompress --stdout', stdin=f, shell=True, stdout=subprocess.PIPE, stderr=DEVNULL) time.sleep(1) if self._subprocess.poll() == None: data = self._subprocess.stdout.read(35536) self.file_decompressor = use self.file_bytes_read += len(data) while data: yield data data = self._subprocess.stdout.read(35536) self.file_bytes_read += len(data) self._file.close() DEVNULL.close() raise StopIteration # Python's gzip module can't read from a stream that doesn't support seek(), and the zlib module cannot read the bgzip format without a lot of help: self.file_decompressor = 'internal' raw_data = magic + self._file.read(65536) self.file_bytes_read = len(raw_data) internal_cache = [] blocks_left_to_grab = 50 bs = 0 checkpoint = 0 decompress = zlib.decompress while raw_data: if len(raw_data) - bs < 35536: raw_data = raw_data[bs:] + self._file.read(65536) self.file_bytes_read += len(raw_data) - bs bs = 0 magic = raw_data[bs:bs+4] if not magic: break # a child's heart if magic != "\x1f\x8b\x08\x04": raise PybamError('\n\nThe input file is not in a format I understand. First four bytes: ' + repr(magic) + '\n') try: more_bs = bs + unpack(" sam.\n\nThese two headers should always be the same, but apparently they are not:\nThe ASCII header looks like: ' + self.file_header + '\nWhile the binary header has the following chromosomes: ' + self.file_chromosomes + '\n') ## Variable parsing: def new_entry(header_cache): cache = header_cache # we keep a small cache of X bytes of decompressed BAM data, to smoothen out disk access. p = 0 # where the next alignment/entry starts in the cache while True: try: while len(cache) < p + 4: cache = cache[p:] + next(self._generator); p = 0 # Grab enough bytes to parse blocksize self.sam_block_size = unpack('> 4 , cigar_codes[cig & 0b1111] ) for cig in array('I', self.bam[self._end_of_qname : self._end_of_cigar ])] @property def sam_cigar_string(self): return ''.join( [ str(cig >> 4) + cigar_codes[cig & 0b1111] for cig in array('I', self.bam[self._end_of_qname : self._end_of_cigar ])]) @property def sam_seq(self): return ''.join( [ dna_codes[dna >> 4] + dna_codes[dna & 0b1111] for dna in array('B', self.bam[self._end_of_cigar : self._end_of_seq ])])[:self.sam_l_seq] # As DNA is 4 bits packed 2-per-byte, there might be a trailing '0000', so we can either @property def sam_qual(self): return ''.join( [ chr(ord(quality) + 33) for quality in self.bam[self._end_of_seq : self._end_of_qual ]]) @property def sam_tags_list(self): result = [] offset = self._end_of_qual while offset != len(self.bam): tag_name = self.bam[offset:offset+2] tag_type = self.bam[offset+2] if tag_type == 'Z': offset_end = self.bam.index('\x00',offset+3)+1 tag_data = self.bam[offset+3:offset_end-1] elif tag_type in CtoPy: offset_end = offset+3+py4py[tag_type] tag_data = unpack(CtoPy[tag_type],self.bam[offset+3:offset_end])[0] elif tag_type == 'B': offset_end = offset+8+(unpack('