Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove global metadata cache, refactor custom_metadata API #238

Merged
merged 9 commits into from
Sep 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "Arrow"
uuid = "69666777-d1a9-59fb-9406-91d4454c9d45"
authors = ["quinnj <quinn.jacobd@gmail.com>"]
version = "1.6.2"
version = "2.0.0"

[deps]
ArrowTypes = "31f734f8-188a-4ce0-8406-c8a06bd891cd"
Expand Down
8 changes: 6 additions & 2 deletions docs/src/manual.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,13 @@ This stuff can definitely make your eyes glaze over if you stare at it long enou

In addition to `Arrow.Table`, the Arrow.jl package also provides `Arrow.Stream` for processing arrow data. While `Arrow.Table` will iterate all record batches in an arrow file/stream, concatenating columns, `Arrow.Stream` provides a way to *iterate* through record batches, one at a time. Each iteration yields an `Arrow.Table` instance, with columns/data for a single record batch. This allows, if so desired, "batch processing" of arrow data, one record batch at a time, instead of creating a single long table via `Arrow.Table`.

### Table and column metadata
### Custom application metadata

The arrow format allows attaching arbitrary metadata in the form of a `Dict{String, String}` to tables and individual columns. The Arrow.jl package supports retrieving serialized metadata by calling `Arrow.getmetadata(table)` or `Arrow.getmetadata(column)`.
The Arrow format allows data producers to [attach custom metadata](https://arrow.apache.org/docs/format/Columnar.html#custom-application-metadata) to various Arrow objects.

Arrow.jl provides a convenient accessor for this metadata via [`Arrow.getmetadata`](@ref). `Arrow.getmetadata(t::Arrow.Table)` will return an immutable `AbstractDict{String,String}` that represents the [`custom_metadata` of the table's associated `Schema`](https://github.com/apache/arrow/blob/85d8175ea24b4dd99f108a673e9b63996d4f88cc/format/Schema.fbs#L515) (or `nothing` if no such metadata exists), while `Arrow.getmetadata(c::Arrow.ArrowVector)` will return a similar representation of [the column's associated `Field` `custom_metadata`](https://github.com/apache/arrow/blob/85d8175ea24b4dd99f108a673e9b63996d4f88cc/format/Schema.fbs#L480) (or `nothing` if no such metadata exists).

To attach custom schema/column metadata to Arrow tables at serialization time, see the `metadata` and `colmetadata` keyword arguments to [`Arrow.write`](@ref).

## Writing arrow data

Expand Down
2 changes: 1 addition & 1 deletion src/Arrow.jl
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ See docs for official Arrow.jl API with the [User Manual](@ref) and reference do
"""
module Arrow

using Base.Iterators
using Mmap
import Dates
using DataAPI, Tables, SentinelArrays, PooledArrays, CodecLz4, CodecZstd, TimeZones, BitIntegers
Expand Down Expand Up @@ -106,7 +107,6 @@ function __init__()
CodecLz4.TranscodingStreams.initialize(lz4)
push!(LZ4_FRAME_COMPRESSOR, lz4)
end
OBJ_METADATA_LOCK[] = ReentrantLock()
return
end

Expand Down
2 changes: 1 addition & 1 deletion src/ArrowTypes/Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "ArrowTypes"
uuid = "31f734f8-188a-4ce0-8406-c8a06bd891cd"
authors = ["quinnj <quinn.jacobd@gmail.com>"]
version = "1.2.1"
version = "1.2.2"


[deps]
Expand Down
7 changes: 0 additions & 7 deletions src/ArrowTypes/src/ArrowTypes.jl
Original file line number Diff line number Diff line change
Expand Up @@ -366,13 +366,6 @@ const JULIA_TO_ARROW_TYPE_MAPPING = Dict{Type, Tuple{String, Type}}()

istyperegistered(::Type{T}) where {T} = haskey(JULIA_TO_ARROW_TYPE_MAPPING, T)

function getarrowtype!(meta, ::Type{T}) where {T}
arrowname, arrowtype = JULIA_TO_ARROW_TYPE_MAPPING[T]
meta["ARROW:extension:name"] = arrowname
meta["ARROW:extension:metadata"] = ""
return arrowtype
end

const ARROW_TO_JULIA_TYPE_MAPPING = Dict{String, Tuple{Type, Type}}()

function extensiontype(f, meta)
Expand Down
12 changes: 8 additions & 4 deletions src/append.jl
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ or the `JULIA_NUM_THREADS` environment variable is set).

Supported keyword arguments to `Arrow.append` include:
* `alignment::Int=8`: specify the number of bytes to align buffers to when written in messages; strongly recommended to only use alignment values of 8 or 64 for modern memory cache line optimization
* `colmetadata=nothing`: the metadata that should be written as the table's columns' `custom_metadata` fields; must either be `nothing` or an `AbstractDict` of `column_name::Symbol => column_metadata` where `column_metadata` is an iterable of `<:AbstractString` pairs.
* `dictencode::Bool=false`: whether all columns should use dictionary encoding when being written; to dict encode specific columns, wrap the column/array in `Arrow.DictEncode(col)`
* `dictencodenested::Bool=false`: whether nested data type columns should also dict encode nested arrays/buffers; other language implementations [may not support this](https://arrow.apache.org/docs/status.html)
* `denseunions::Bool=true`: whether Julia `Vector{<:Union}` arrays should be written using the dense union layout; passing `false` will result in the sparse union layout
* `largelists::Bool=false`: causes list column types to be written with Int64 offset arrays; mainly for testing purposes; by default, Int64 offsets will be used only if needed
* `maxdepth::Int=$DEFAULT_MAX_DEPTH`: deepest allowed nested serialization level; this is provided by default to prevent accidental infinite recursion with mutually recursive data structures
* `metadata=Arrow.getmetadata(tbl)`: the metadata that should be written as the table's schema's `custom_metadata` field; must either be `nothing` or an iterable of `<:AbstractString` pairs.
* `ntasks::Int`: number of concurrent threaded tasks to allow while writing input partitions out as arrow record batches; default is no limit; to disable multithreaded writing, pass `ntasks=1`
* `convert::Bool`: whether certain arrow primitive types in the schema of `file` should be converted to Julia defaults for matching them to the schema of `tbl`; by default, `convert=true`.
* `file::Bool`: applicable when an `IO` is provided, whether it is a file; by default `file=false`.
Expand All @@ -49,6 +51,8 @@ function append(file::String, tbl; kwargs...)
end

function append(io::IO, tbl;
metadata=getmetadata(tbl),
colmetadata=nothing,
largelists::Bool=false,
denseunions::Bool=true,
dictencode::Bool=false,
Expand Down Expand Up @@ -76,12 +80,12 @@ function append(io::IO, tbl;
throw(ArgumentError("unsupported compress keyword argument value: $compress. Valid values include `:lz4` or `:zstd`"))
end

append(io, tbl, arrow_schema, compress, largelists, denseunions, dictencode, dictencodenested, alignment, maxdepth, ntasks)
append(io, tbl, arrow_schema, compress, largelists, denseunions, dictencode, dictencodenested, alignment, maxdepth, ntasks, metadata, colmetadata)

return io
end

function append(io::IO, source, arrow_schema, compress, largelists, denseunions, dictencode, dictencodenested, alignment, maxdepth, ntasks)
function append(io::IO, source, arrow_schema, compress, largelists, denseunions, dictencode, dictencodenested, alignment, maxdepth, ntasks, meta, colmeta)
seekend(io)
skip(io, -8) # overwrite last 8 bytes of last empty message footer

Expand Down Expand Up @@ -113,9 +117,9 @@ function append(io::IO, source, arrow_schema, compress, largelists, denseunions,
end

if threaded
Threads.@spawn process_partition(tbl_cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror)
Threads.@spawn process_partition(tbl_cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror, meta, colmeta)
else
@async process_partition(tbl_cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror)
@async process_partition(tbl_cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror, meta, colmeta)
end
end
if anyerror[]
Expand Down
27 changes: 20 additions & 7 deletions src/arraytypes/arraytypes.jl
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,32 @@ function arrowvector(x, i, nl, fi, de, ded, meta; dictencoding::Bool=false, dict
end
S = maybemissing(eltype(x))
if ArrowTypes.hasarrowname(T)
meta = meta === nothing ? Dict{String, String}() : meta
meta["ARROW:extension:name"] = String(ArrowTypes.arrowname(T))
meta["ARROW:extension:metadata"] = String(ArrowTypes.arrowmetadata(T))
meta = _arrowtypemeta(_normalizemeta(meta), String(ArrowTypes.arrowname(T)), String(ArrowTypes.arrowmetadata(T)))
end
return arrowvector(S, x, i, nl, fi, de, ded, meta; dictencode=dictencode, maxdepth=maxdepth, kw...)
end

_normalizemeta(::Nothing) = nothing
_normalizemeta(meta) = toidict(String(k) => String(v) for (k, v) in meta)

function _arrowtypemeta(::Nothing, n, m)
return toidict(("ARROW:extension:name" => n, "ARROW:extension:metadata" => m))
end

function _arrowtypemeta(meta, n, m)
dict = Dict(meta)
dict["ARROW:extension:name"] = n
dict["ARROW:extension:metadata"] = m
return toidict(dict)
end

# now we check for ArrowType converions and dispatch on ArrowKind
function arrowvector(::Type{S}, x, i, nl, fi, de, ded, meta; kw...) where {S}
meta = _normalizemeta(meta)
# deprecated and will be removed
if ArrowTypes.istyperegistered(S)
meta = meta === nothing ? Dict{String, String}() : meta
arrowtype = ArrowTypes.getarrowtype!(meta, S)
arrowname, arrowtype = ArrowTypes.JULIA_TO_ARROW_TYPE_MAPPING[S]
meta = _arrowtypemeta(meta, arrowname, "")
if arrowtype === S
return arrowvector(ArrowKind(S), x, i, nl, fi, de, ded, meta; kw...)
else
Expand All @@ -87,12 +100,12 @@ end

struct NullVector{T} <: ArrowVector{T}
data::MissingVector
metadata::Union{Nothing, Dict{String, String}}
metadata::Union{Nothing, Base.ImmutableDict{String, String}}
end
Base.size(v::NullVector) = (length(v.data),)
Base.getindex(v::NullVector{T}, i::Int) where {T} = ArrowTypes.fromarrow(T, getindex(v.data, i))

arrowvector(::NullKind, x, i, nl, fi, de, ded, meta; kw...) = NullVector{eltype(x)}(MissingVector(length(x)), meta)
arrowvector(::NullKind, x, i, nl, fi, de, ded, meta; kw...) = NullVector{eltype(x)}(MissingVector(length(x)), isnothing(meta) ? nothing : toidict(meta))
compress(Z::Meta.CompressionType, comp, v::NullVector) =
Compressed{Z, NullVector}(v, CompressedBuffer[], length(v), length(v), Compressed[])

Expand Down
2 changes: 1 addition & 1 deletion src/arraytypes/bool.jl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ struct BoolVector{T} <: ArrowVector{T}
pos::Int
validity::ValidityBitmap
ℓ::Int64
metadata::Union{Nothing, Dict{String, String}}
metadata::Union{Nothing, Base.ImmutableDict{String, String}}
end

Base.size(p::BoolVector) = (p.ℓ,)
Expand Down
6 changes: 3 additions & 3 deletions src/arraytypes/dictencoding.jl
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ mutable struct DictEncoding{T, S, A} <: ArrowVector{T}
id::Int64
data::A
isOrdered::Bool
metadata::Union{Nothing, Dict{String, String}}
metadata::Union{Nothing, Base.ImmutableDict{String, String}}
end

indextype(::Type{DictEncoding{T, S, A}}) where {T, S, A} = S
Expand Down Expand Up @@ -91,7 +91,7 @@ struct DictEncoded{T, S, A} <: ArrowVector{T}
validity::ValidityBitmap
indices::Vector{S}
encoding::DictEncoding{T, S, A}
metadata::Union{Nothing, Dict{String, String}}
metadata::Union{Nothing, Base.ImmutableDict{String, String}}
end

DictEncoded(b::Vector{UInt8}, v::ValidityBitmap, inds::Vector{S}, encoding::DictEncoding{T, S, A}, meta) where {S, T, A} =
Expand Down Expand Up @@ -229,7 +229,7 @@ function arrowvector(::DictEncodedKind, x, i, nl, fi, de, ded, meta; dictencode:
end
end
if meta !== nothing && getmetadata(encoding) !== nothing
merge!(meta, getmetadata(encoding))
meta = toidict(merge!(Dict(meta), Dict(getmetadata(encoding))))
elseif getmetadata(encoding) !== nothing
meta = getmetadata(encoding)
end
Expand Down
2 changes: 1 addition & 1 deletion src/arraytypes/fixedsizelist.jl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ struct FixedSizeList{T, A <: AbstractVector} <: ArrowVector{T}
validity::ValidityBitmap
data::A
ℓ::Int
metadata::Union{Nothing, Dict{String, String}}
metadata::Union{Nothing, Base.ImmutableDict{String,String}}
end

Base.size(l::FixedSizeList) = (l.ℓ,)
Expand Down
2 changes: 1 addition & 1 deletion src/arraytypes/list.jl
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ struct List{T, O, A} <: ArrowVector{T}
offsets::Offsets{O}
data::A
ℓ::Int
metadata::Union{Nothing, Dict{String, String}}
metadata::Union{Nothing, Base.ImmutableDict{String,String}}
end

Base.size(l::List) = (l.ℓ,)
Expand Down
2 changes: 1 addition & 1 deletion src/arraytypes/map.jl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ struct Map{T, O, A} <: ArrowVector{T}
offsets::Offsets{O}
data::A
ℓ::Int
metadata::Union{Nothing, Dict{String, String}}
metadata::Union{Nothing, Base.ImmutableDict{String,String}}
end

Base.size(l::Map) = (l.ℓ,)
Expand Down
2 changes: 1 addition & 1 deletion src/arraytypes/primitive.jl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ struct Primitive{T, A} <: ArrowVector{T}
validity::ValidityBitmap
data::A
ℓ::Int64
metadata::Union{Nothing, Dict{String, String}}
metadata::Union{Nothing, Base.ImmutableDict{String,String}}
end

Primitive(::Type{T}, b::Vector{UInt8}, v::ValidityBitmap, data::A, l, meta) where {T, A} =
Expand Down
2 changes: 1 addition & 1 deletion src/arraytypes/struct.jl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ struct Struct{T, S} <: ArrowVector{T}
validity::ValidityBitmap
data::S # Tuple of ArrowVector
ℓ::Int
metadata::Union{Nothing, Dict{String, String}}
metadata::Union{Nothing, Base.ImmutableDict{String,String}}
end

Base.size(s::Struct) = (s.ℓ,)
Expand Down
4 changes: 2 additions & 2 deletions src/arraytypes/unions.jl
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ struct DenseUnion{T, U, S} <: ArrowVector{T}
typeIds::Vector{UInt8}
offsets::Vector{Int32}
data::S # Tuple of ArrowVector
metadata::Union{Nothing, Dict{String, String}}
metadata::Union{Nothing, Base.ImmutableDict{String,String}}
end

Base.size(s::DenseUnion) = size(s.typeIds)
Expand Down Expand Up @@ -185,7 +185,7 @@ struct SparseUnion{T, U, S} <: ArrowVector{T}
arrow::Vector{UInt8} # need to hold a reference to arrow memory blob
typeIds::Vector{UInt8}
data::S # Tuple of ArrowVector
metadata::Union{Nothing, Dict{String, String}}
metadata::Union{Nothing, Base.ImmutableDict{String,String}}
end

Base.size(s::SparseUnion) = size(s.typeIds)
Expand Down
2 changes: 1 addition & 1 deletion src/eltypes.jl
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ function juliaeltype(f::Meta.Field, ::Nothing, convert::Bool)
return convert ? finaljuliatype(T) : T
end

function juliaeltype(f::Meta.Field, meta::Dict{String, String}, convert::Bool)
function juliaeltype(f::Meta.Field, meta::AbstractDict{String, String}, convert::Bool)
TT = juliaeltype(f, convert)
!convert && return TT
T = finaljuliatype(TT)
Expand Down
39 changes: 27 additions & 12 deletions src/table.jl
Original file line number Diff line number Diff line change
Expand Up @@ -182,19 +182,37 @@ struct Table <: Tables.AbstractColumns
columns::Vector{AbstractVector}
lookup::Dict{Symbol, AbstractVector}
schema::Ref{Meta.Schema}
metadata::Ref{Dict{String, String}}
metadata::Ref{Union{Nothing,Base.ImmutableDict{String,String}}}
end

Table() = Table(Symbol[], Type[], AbstractVector[], Dict{Symbol, AbstractVector}(), Ref{Meta.Schema}(), Ref{Dict{String, String}}())
Table(names, types, columns, lookup, schema) = Table(names, types, columns, lookup, schema, Ref{Dict{String, String}}())
Table() = Table(Symbol[], Type[], AbstractVector[], Dict{Symbol, AbstractVector}(), Ref{Meta.Schema}(), Ref{Union{Nothing,Base.ImmutableDict{String,String}}}(nothing))

function Table(names, types, columns, lookup, schema)
m = isassigned(schema) ? buildmetadata(schema[]) : nothing
return Table(names, types, columns, lookup, schema, Ref{Union{Nothing,Base.ImmutableDict{String,String}}}(m))
end

names(t::Table) = getfield(t, :names)
types(t::Table) = getfield(t, :types)
columns(t::Table) = getfield(t, :columns)
lookup(t::Table) = getfield(t, :lookup)
schema(t::Table) = getfield(t, :schema)
getmetadata(t::Table) = isdefined(getfield(t, :metadata), :x) ? getfield(t, :metadata)[] : nothing
setmetadata!(t::Table, m::Dict{String, String}) = (setindex!(getfield(t, :metadata), m); nothing)

"""
Arrow.getmetadata(x)

If `x isa Arrow.Table` return a `Base.ImmutableDict{String,String}` representation of `x`'s
`Schema` `custom_metadata`, or `nothing` if no such metadata exists.

If `x isa Arrow.ArrowVector`, return a `Base.ImmutableDict{String,String}` representation of `x`'s
`Field` `custom_metadata`, or `nothing` if no such metadata exists.

Otherwise, return `nothing`.

See [the official Arrow documentation for more details on custom application metadata](https://arrow.apache.org/docs/format/Columnar.html#custom-application-metadata).
"""
getmetadata(t::Table) = getfield(t, :metadata)[]
getmetadata(::Any) = nothing

Tables.istable(::Table) = true
Tables.columnaccess(::Table) = true
Expand Down Expand Up @@ -306,10 +324,7 @@ function Table(bytes::Vector{UInt8}, off::Integer=1, tlen::Union{Integer, Nothin
lu[nm] = col
push!(ty, eltype(col))
end
meta = sch !== nothing ? sch.custom_metadata : nothing
if meta !== nothing
getfield(t, :metadata)[] = buildmetadata(meta)
end
getfield(t, :metadata)[] = buildmetadata(sch)
return t
end

Expand Down Expand Up @@ -366,10 +381,10 @@ struct VectorIterator
convert::Bool
end

buildmetadata(f::Meta.Field) = buildmetadata(f.custom_metadata)
buildmetadata(meta) = Dict(String(kv.key) => String(kv.value) for kv in meta)
buildmetadata(f::Union{Meta.Field,Meta.Schema}) = buildmetadata(f.custom_metadata)
buildmetadata(meta) = toidict(String(kv.key) => String(kv.value) for kv in meta)
buildmetadata(::Nothing) = nothing
buildmetadata(x::Dict{String, String}) = x
buildmetadata(x::AbstractDict) = x

function Base.iterate(x::VectorIterator, (columnidx, nodeidx, bufferidx)=(Int64(1), Int64(1), Int64(1)))
columnidx > length(x.schema.fields) && return nothing
Expand Down
11 changes: 11 additions & 0 deletions src/utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -207,3 +207,14 @@ function tobuffer(data; kwargs...)
seekstart(io)
return io
end

toidict(x::Base.ImmutableDict) = x

# ref https://github.com/JuliaData/Arrow.jl/pull/238#issuecomment-919415809
function toidict(pairs)
dict = Base.ImmutableDict(first(pairs))
for pair in Iterators.drop(pairs, 1)
dict = Base.ImmutableDict(dict, pair)
end
return dict
end
jrevels marked this conversation as resolved.
Show resolved Hide resolved
Loading