Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Make BAM readers distributable #4

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/bam/bai.jl
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ function BAI(input::IO)
return read_bai(input)
end

function findbai(filepath::AbstractString)
baipath = string(filepath, ".bai")
return isfile(baipath) ? Nullable(BAI(baipath)) : Nullable{BAI}()
end

# Read a BAI object from `input`.
function read_bai(input::IO)
# check magic bytes
Expand Down
32 changes: 18 additions & 14 deletions src/bam/overlap.jl
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ end
# Iterator
# --------

mutable struct OverlapIteratorState
mutable struct OverlapIteratorState{S}
# reader's state
readerstate::ReaderState{S,Record}

# reference index
refindex::Int

Expand All @@ -40,30 +43,31 @@ mutable struct OverlapIteratorState

# current chunk index
chunkid::Int

# pre-allocated record
record::Record
end

function Base.start(iter::OverlapIterator)
refindex = findfirst(iter.reader.refseqnames, iter.refname)
readerstate = ReaderState(iter.reader)
reader = readerstate.reader
refindex = findfirst(reader.refseqnames, iter.refname)
if refindex == 0
throw(ArgumentError("sequence name $(iter.refname) is not found in the header"))
end
@assert !isnull(iter.reader.index)
chunks = GenomicFeatures.Indexes.overlapchunks(get(iter.reader.index).index, refindex, iter.interval)
@assert !isnull(reader.index)
chunks = GenomicFeatures.Indexes.overlapchunks(get(reader.index).index, refindex, iter.interval)
if !isempty(chunks)
seek(iter.reader, first(chunks).start)
seek(reader.input, first(chunks).start)
end
return OverlapIteratorState(refindex, chunks, 1, Record())
return OverlapIteratorState(readerstate, refindex, chunks, 1)
end

function Base.done(iter::OverlapIterator, state)
reader = state.readerstate.reader
record = state.readerstate.record
while state.chunkid ≤ endof(state.chunks)
chunk = state.chunks[state.chunkid]
while BGZFStreams.virtualoffset(iter.reader.stream) < chunk.stop
read!(iter.reader, state.record)
c = compare_intervals(state.record, (state.refindex, iter.interval))
while BGZFStreams.virtualoffset(reader.input) < chunk.stop
read!(reader, record)
c = compare_intervals(record, (state.refindex, iter.interval))
if c == 0
# overlapping
return false
Expand All @@ -74,14 +78,14 @@ function Base.done(iter::OverlapIterator, state)
end
state.chunkid += 1
if state.chunkid ≤ endof(state.chunks)
seek(iter.reader, state.chunks[state.chunkid].start)
seek(reader.input, state.chunks[state.chunkid].start)
end
end
return true
end

function Base.next(::OverlapIterator, state)
return copy(state.record), state
return copy(state.readerstate.record), state
end

function compare_intervals(record::Record, interval::Tuple{Int,UnitRange{Int}})
Expand Down
186 changes: 97 additions & 89 deletions src/bam/reader.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,100 @@
# ==========

"""
BAM.Reader(input::IO; index=nothing)
BAM.Reader(input; index=nothing)

Create a data reader of the BAM file format.

# Arguments
* `input`: data source
* `index=nothing`: filepath to a random access index (currently *bai* is Supported)
* `input`: data source (filepath or readable `IO` object)
* `index=nothing`: filepath to a random access index (currently *bai* is supported)
"""
mutable struct Reader{T} <: Bio.IO.AbstractReader
stream::BGZFStreams.BGZFStream{T}
mutable struct Reader{T<:Union{String,BGZFStreams.BGZFStream}} <: Bio.IO.AbstractReader
# data source
input::T

# BAM index
index::Nullable{BAI}

# header data
header::SAM.Header
start_offset::BGZFStreams.VirtualOffset
refseqnames::Vector{String}
refseqlens::Vector{Int}
index::Nullable{BAI}
end

function Base.eltype{T}(::Type{Reader{T}})
return Record
end
function Reader(input::BGZFStreams.BGZFStream; index=nothing)
if index == nothing
index = Nullable{BAI}()
elseif index isa BAI
index = Nullable(index)
elseif index isa AbstractString
index = Nullable(BAI(index))
elseif index isa Nullable{BAI}
# ok
else
error("unrecognizable index argument: $(typeof(index))")
end

function Bio.IO.stream(reader::Reader)
return reader.stream
# magic bytes
B = read(input, UInt8)
A = read(input, UInt8)
M = read(input, UInt8)
x = read(input, UInt8)
if B != UInt8('B') || A != UInt8('A') || M != UInt8('M') || x != 0x01
error("input was not a valid BAM file")
end

# SAM header
textlen = read(input, Int32)
samreader = SAM.Reader(IOBuffer(read(input, UInt8, textlen)))

# reference sequences
refseqnames = String[]
refseqlens = Int[]
n_refs = read(input, Int32)
for _ in 1:n_refs
namelen = read(input, Int32)
data = read(input, UInt8, namelen)
seqname = unsafe_string(pointer(data))
seqlen = read(input, Int32)
push!(refseqnames, seqname)
push!(refseqlens, seqlen)
end

return Reader(input, index, samreader.header, refseqnames, refseqlens)
end

function Reader(input::IO; index=nothing)
if isa(index, AbstractString)
index = BAI(index)
else
if index != nothing
error("unrecognizable index argument")
return Reader(BGZFStreams.BGZFStream(input), index=index)
end

function Reader(filepath::AbstractString; index=:auto)
if index isa Symbol
if index == :auto
index = findbai(filepath)
else
throw(ArgumentError("invalid index: ':$(index)'"))
end
elseif index isa AbstractString
index = BAI(index)
end
reader = init_bam_reader(input)
reader.index = index
return reader
return Reader(filepath, index, SAM.Header(), String[], Int[])
end

function Base.open(reader::Reader{String})
return Reader(open(reader.input), index=reader.index)
end

function Base.eltype{T}(::Type{Reader{T}})
return Record
end

function Bio.IO.stream(reader::Reader)
return reader.input
end

function Base.show(io::IO, reader::Reader)
println(io, summary(reader), ":")
print(io, " number of contigs: ", length(reader.refseqnames))
print(io, summary(reader), "(<input=", repr(reader.input), ">)")
end

"""
Expand Down Expand Up @@ -70,81 +123,36 @@ function Bio.header(reader::Reader)
return header(reader)
end

function Base.seek(reader::Reader, voffset::BGZFStreams.VirtualOffset)
seek(reader.stream, voffset)
#function Base.seek(reader::Reader, voffset::BGZFStreams.VirtualOffset)
# seek(reader.stream, voffset)
#end
#
#function Base.seekstart(reader::Reader)
# seek(reader.stream, reader.start_offset)
#end

struct ReaderState{S,T}
reader::S
record::T
end

function Base.seekstart(reader::Reader)
seek(reader.stream, reader.start_offset)
function ReaderState(reader::Reader{<:BGZFStreams.BGZFStream})
return ReaderState(reader, Record())
end

function Base.start(reader::Reader)
return Record()
function ReaderState(reader::Reader{String})
return ReaderState(open(reader), Record())
end

function Base.done(reader::Reader, rec)
return eof(reader)
function Base.start(reader::Reader)
return ReaderState(reader)
end

function Base.next(reader::Reader, rec)
read!(reader, rec)
return copy(rec), rec
function Base.done(::Reader, state)
return eof(state.reader.input)
end

# Initialize a BAM reader by reading the header section.
function init_bam_reader(input::BGZFStreams.BGZFStream)
# magic bytes
B = read(input, UInt8)
A = read(input, UInt8)
M = read(input, UInt8)
x = read(input, UInt8)
if B != UInt8('B') || A != UInt8('A') || M != UInt8('M') || x != 0x01
error("input was not a valid BAM file")
end

# SAM header
textlen = read(input, Int32)
samreader = SAM.Reader(IOBuffer(read(input, UInt8, textlen)))

# reference sequences
refseqnames = String[]
refseqlens = Int[]
n_refs = read(input, Int32)
for _ in 1:n_refs
namelen = read(input, Int32)
data = read(input, UInt8, namelen)
seqname = unsafe_string(pointer(data))
seqlen = read(input, Int32)
push!(refseqnames, seqname)
push!(refseqlens, seqlen)
end

voffset = isa(input.io, Pipe) ?
BGZFStreams.VirtualOffset(0, 0) :
BGZFStreams.virtualoffset(input)
return Reader(
input,
samreader.header,
voffset,
refseqnames,
refseqlens,
Nullable{BAI}())
end

function init_bam_reader(input::IO)
return init_bam_reader(BGZFStreams.BGZFStream(input))
end

function _read!(reader::Reader, record)
unsafe_read(
reader.stream,
pointer_from_objref(record),
FIXED_FIELDS_BYTES)
dsize = data_size(record)
if length(record.data) < dsize
resize!(record.data, dsize)
end
unsafe_read(reader.stream, pointer(record.data), dsize)
record.reader = reader
return record
function Base.next(::Reader, state)
read!(state.reader, state.record)
return copy(state.record), state
end
12 changes: 11 additions & 1 deletion src/bam/record.jl
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,17 @@ function Base.show(io::IO, record::Record)
end

function Base.read!(reader::Reader, record::Record)
return _read!(reader, record)
unsafe_read(
reader.input,
pointer_from_objref(record),
FIXED_FIELDS_BYTES)
dsize = data_size(record)
if length(record.data) < dsize
resize!(record.data, dsize)
end
unsafe_read(reader.input, pointer(record.data), dsize)
record.reader = reader
return record
end


Expand Down
68 changes: 68 additions & 0 deletions transcript_depth.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
@everywhere begin
using BioAlignments
using GenomicFeatures

# The main algorithm.
function compute_depth(reader, interval)
range = interval.first:interval.last
depth = zeros(Int, length(range))
for record in eachoverlap(reader, interval)
if !BAM.ismapped(record) || !BAM.isprimary(record)
continue
end
aln = BAM.alignment(record)
for i in 1:BAM.seqlength(record)
j, op = seq2ref(aln, i)
if ismatchop(op) && j in range
@inbounds depth[j - first(range) + 1] += 1
end
end
end
return depth
end
end

# Sequential computation.
function transcript_depth0(bamfile, intervals)
reader = BAM.Reader(bamfile)
return map(intervals) do interval
return compute_depth(reader, interval)
end
end

# Parallel computation using pmap (open BAM.Reader inside the closure).
function transcript_depth1(bamfile, intervals, batchsize)
pmap(intervals, batch_size=batchsize) do interval
reader = BAM.Reader(bamfile)
return compute_depth(reader, interval)
end
end

# Parallel computation using pmap (open BAM.Reader outside the closure).
function transcript_depth2(bamfile, intervals, batchsize)
reader = BAM.Reader(bamfile)
return pmap(intervals, batch_size=batchsize) do interval
return compute_depth(reader, interval)
end
end

bamfile = expanduser("./data/SRR1238088.sort.bam")
gff3file = expanduser("./data/TAIR10_GFF3_genes.gff")
intervals = collect(Interval, Iterators.filter(r->GFF3.seqid(r)=="Chr1" && GFF3.featuretype(r)=="gene", GFF3.Reader(open(gff3file))))

using DocOpt
args = docopt("Usage: main.jl [--batch_size=<n>] <function>")
batch_size = args["--batch_size"]
if batch_size == nothing
batch_size = 30
else
batch_size = parse(Int, batch_size)
end
f = eval(parse(args["<function>"]))
func = () -> f == transcript_depth0 ? f(bamfile, intervals) : f(bamfile, intervals, batch_size)

println(STDERR, sum(map(sum, func())))
for i in 1:3
gc()
println(@elapsed func())
end