From 967e3c67437219698afae889a13e881b60033fe3 Mon Sep 17 00:00:00 2001 From: Jarrett Revels Date: Tue, 14 Sep 2021 15:27:57 -0400 Subject: [PATCH] remove global metadata cache, refactor custom_metadata API (#238) Co-authored-by: Eric Hanson <5846501+ericphanson@users.noreply.github.com> Co-authored-by: Jacob Quinn --- Project.toml | 2 +- docs/src/manual.md | 8 +++- src/Arrow.jl | 2 +- src/ArrowTypes/Project.toml | 2 +- src/ArrowTypes/src/ArrowTypes.jl | 7 ---- src/append.jl | 12 ++++-- src/arraytypes/arraytypes.jl | 27 +++++++++---- src/arraytypes/bool.jl | 2 +- src/arraytypes/dictencoding.jl | 6 +-- src/arraytypes/fixedsizelist.jl | 2 +- src/arraytypes/list.jl | 2 +- src/arraytypes/map.jl | 2 +- src/arraytypes/primitive.jl | 2 +- src/arraytypes/struct.jl | 2 +- src/arraytypes/unions.jl | 4 +- src/eltypes.jl | 2 +- src/table.jl | 39 +++++++++++++------ src/utils.jl | 11 ++++++ src/write.jl | 65 +++++++++----------------------- test/runtests.jl | 27 +++++++------ 20 files changed, 120 insertions(+), 106 deletions(-) diff --git a/Project.toml b/Project.toml index 46bff781..087128da 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "Arrow" uuid = "69666777-d1a9-59fb-9406-91d4454c9d45" authors = ["quinnj "] -version = "1.6.2" +version = "2.0.0" [deps] ArrowTypes = "31f734f8-188a-4ce0-8406-c8a06bd891cd" diff --git a/docs/src/manual.md b/docs/src/manual.md index 5cd259be..f1bac34f 100644 --- a/docs/src/manual.md +++ b/docs/src/manual.md @@ -159,9 +159,13 @@ This stuff can definitely make your eyes glaze over if you stare at it long enou In addition to `Arrow.Table`, the Arrow.jl package also provides `Arrow.Stream` for processing arrow data. While `Arrow.Table` will iterate all record batches in an arrow file/stream, concatenating columns, `Arrow.Stream` provides a way to *iterate* through record batches, one at a time. Each iteration yields an `Arrow.Table` instance, with columns/data for a single record batch. This allows, if so desired, "batch processing" of arrow data, one record batch at a time, instead of creating a single long table via `Arrow.Table`. -### Table and column metadata +### Custom application metadata -The arrow format allows attaching arbitrary metadata in the form of a `Dict{String, String}` to tables and individual columns. The Arrow.jl package supports retrieving serialized metadata by calling `Arrow.getmetadata(table)` or `Arrow.getmetadata(column)`. +The Arrow format allows data producers to [attach custom metadata](https://arrow.apache.org/docs/format/Columnar.html#custom-application-metadata) to various Arrow objects. + +Arrow.jl provides a convenient accessor for this metadata via [`Arrow.getmetadata`](@ref). `Arrow.getmetadata(t::Arrow.Table)` will return an immutable `AbstractDict{String,String}` that represents the [`custom_metadata` of the table's associated `Schema`](https://github.com/apache/arrow/blob/85d8175ea24b4dd99f108a673e9b63996d4f88cc/format/Schema.fbs#L515) (or `nothing` if no such metadata exists), while `Arrow.getmetadata(c::Arrow.ArrowVector)` will return a similar representation of [the column's associated `Field` `custom_metadata`](https://github.com/apache/arrow/blob/85d8175ea24b4dd99f108a673e9b63996d4f88cc/format/Schema.fbs#L480) (or `nothing` if no such metadata exists). + +To attach custom schema/column metadata to Arrow tables at serialization time, see the `metadata` and `colmetadata` keyword arguments to [`Arrow.write`](@ref). ## Writing arrow data diff --git a/src/Arrow.jl b/src/Arrow.jl index c3ace2c1..a623031c 100644 --- a/src/Arrow.jl +++ b/src/Arrow.jl @@ -41,6 +41,7 @@ See docs for official Arrow.jl API with the [User Manual](@ref) and reference do """ module Arrow +using Base.Iterators using Mmap import Dates using DataAPI, Tables, SentinelArrays, PooledArrays, CodecLz4, CodecZstd, TimeZones, BitIntegers @@ -106,7 +107,6 @@ function __init__() CodecLz4.TranscodingStreams.initialize(lz4) push!(LZ4_FRAME_COMPRESSOR, lz4) end - OBJ_METADATA_LOCK[] = ReentrantLock() return end diff --git a/src/ArrowTypes/Project.toml b/src/ArrowTypes/Project.toml index f0861e39..47c2e4f4 100644 --- a/src/ArrowTypes/Project.toml +++ b/src/ArrowTypes/Project.toml @@ -1,7 +1,7 @@ name = "ArrowTypes" uuid = "31f734f8-188a-4ce0-8406-c8a06bd891cd" authors = ["quinnj "] -version = "1.2.1" +version = "1.2.2" [deps] diff --git a/src/ArrowTypes/src/ArrowTypes.jl b/src/ArrowTypes/src/ArrowTypes.jl index ce9578c9..386c7d49 100644 --- a/src/ArrowTypes/src/ArrowTypes.jl +++ b/src/ArrowTypes/src/ArrowTypes.jl @@ -366,13 +366,6 @@ const JULIA_TO_ARROW_TYPE_MAPPING = Dict{Type, Tuple{String, Type}}() istyperegistered(::Type{T}) where {T} = haskey(JULIA_TO_ARROW_TYPE_MAPPING, T) -function getarrowtype!(meta, ::Type{T}) where {T} - arrowname, arrowtype = JULIA_TO_ARROW_TYPE_MAPPING[T] - meta["ARROW:extension:name"] = arrowname - meta["ARROW:extension:metadata"] = "" - return arrowtype -end - const ARROW_TO_JULIA_TYPE_MAPPING = Dict{String, Tuple{Type, Type}}() function extensiontype(f, meta) diff --git a/src/append.jl b/src/append.jl index 8a4d044e..8fc24dd8 100644 --- a/src/append.jl +++ b/src/append.jl @@ -27,11 +27,13 @@ or the `JULIA_NUM_THREADS` environment variable is set). Supported keyword arguments to `Arrow.append` include: * `alignment::Int=8`: specify the number of bytes to align buffers to when written in messages; strongly recommended to only use alignment values of 8 or 64 for modern memory cache line optimization + * `colmetadata=nothing`: the metadata that should be written as the table's columns' `custom_metadata` fields; must either be `nothing` or an `AbstractDict` of `column_name::Symbol => column_metadata` where `column_metadata` is an iterable of `<:AbstractString` pairs. * `dictencode::Bool=false`: whether all columns should use dictionary encoding when being written; to dict encode specific columns, wrap the column/array in `Arrow.DictEncode(col)` * `dictencodenested::Bool=false`: whether nested data type columns should also dict encode nested arrays/buffers; other language implementations [may not support this](https://arrow.apache.org/docs/status.html) * `denseunions::Bool=true`: whether Julia `Vector{<:Union}` arrays should be written using the dense union layout; passing `false` will result in the sparse union layout * `largelists::Bool=false`: causes list column types to be written with Int64 offset arrays; mainly for testing purposes; by default, Int64 offsets will be used only if needed * `maxdepth::Int=$DEFAULT_MAX_DEPTH`: deepest allowed nested serialization level; this is provided by default to prevent accidental infinite recursion with mutually recursive data structures + * `metadata=Arrow.getmetadata(tbl)`: the metadata that should be written as the table's schema's `custom_metadata` field; must either be `nothing` or an iterable of `<:AbstractString` pairs. * `ntasks::Int`: number of concurrent threaded tasks to allow while writing input partitions out as arrow record batches; default is no limit; to disable multithreaded writing, pass `ntasks=1` * `convert::Bool`: whether certain arrow primitive types in the schema of `file` should be converted to Julia defaults for matching them to the schema of `tbl`; by default, `convert=true`. * `file::Bool`: applicable when an `IO` is provided, whether it is a file; by default `file=false`. @@ -49,6 +51,8 @@ function append(file::String, tbl; kwargs...) end function append(io::IO, tbl; + metadata=getmetadata(tbl), + colmetadata=nothing, largelists::Bool=false, denseunions::Bool=true, dictencode::Bool=false, @@ -76,12 +80,12 @@ function append(io::IO, tbl; throw(ArgumentError("unsupported compress keyword argument value: $compress. Valid values include `:lz4` or `:zstd`")) end - append(io, tbl, arrow_schema, compress, largelists, denseunions, dictencode, dictencodenested, alignment, maxdepth, ntasks) + append(io, tbl, arrow_schema, compress, largelists, denseunions, dictencode, dictencodenested, alignment, maxdepth, ntasks, metadata, colmetadata) return io end -function append(io::IO, source, arrow_schema, compress, largelists, denseunions, dictencode, dictencodenested, alignment, maxdepth, ntasks) +function append(io::IO, source, arrow_schema, compress, largelists, denseunions, dictencode, dictencodenested, alignment, maxdepth, ntasks, meta, colmeta) seekend(io) skip(io, -8) # overwrite last 8 bytes of last empty message footer @@ -113,9 +117,9 @@ function append(io::IO, source, arrow_schema, compress, largelists, denseunions, end if threaded - Threads.@spawn process_partition(tbl_cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror) + Threads.@spawn process_partition(tbl_cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror, meta, colmeta) else - @async process_partition(tbl_cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror) + @async process_partition(tbl_cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror, meta, colmeta) end end if anyerror[] diff --git a/src/arraytypes/arraytypes.jl b/src/arraytypes/arraytypes.jl index 6db185a6..beba66f3 100644 --- a/src/arraytypes/arraytypes.jl +++ b/src/arraytypes/arraytypes.jl @@ -62,19 +62,32 @@ function arrowvector(x, i, nl, fi, de, ded, meta; dictencoding::Bool=false, dict end S = maybemissing(eltype(x)) if ArrowTypes.hasarrowname(T) - meta = meta === nothing ? Dict{String, String}() : meta - meta["ARROW:extension:name"] = String(ArrowTypes.arrowname(T)) - meta["ARROW:extension:metadata"] = String(ArrowTypes.arrowmetadata(T)) + meta = _arrowtypemeta(_normalizemeta(meta), String(ArrowTypes.arrowname(T)), String(ArrowTypes.arrowmetadata(T))) end return arrowvector(S, x, i, nl, fi, de, ded, meta; dictencode=dictencode, maxdepth=maxdepth, kw...) end +_normalizemeta(::Nothing) = nothing +_normalizemeta(meta) = toidict(String(k) => String(v) for (k, v) in meta) + +function _arrowtypemeta(::Nothing, n, m) + return toidict(("ARROW:extension:name" => n, "ARROW:extension:metadata" => m)) +end + +function _arrowtypemeta(meta, n, m) + dict = Dict(meta) + dict["ARROW:extension:name"] = n + dict["ARROW:extension:metadata"] = m + return toidict(dict) +end + # now we check for ArrowType converions and dispatch on ArrowKind function arrowvector(::Type{S}, x, i, nl, fi, de, ded, meta; kw...) where {S} + meta = _normalizemeta(meta) # deprecated and will be removed if ArrowTypes.istyperegistered(S) - meta = meta === nothing ? Dict{String, String}() : meta - arrowtype = ArrowTypes.getarrowtype!(meta, S) + arrowname, arrowtype = ArrowTypes.JULIA_TO_ARROW_TYPE_MAPPING[S] + meta = _arrowtypemeta(meta, arrowname, "") if arrowtype === S return arrowvector(ArrowKind(S), x, i, nl, fi, de, ded, meta; kw...) else @@ -87,12 +100,12 @@ end struct NullVector{T} <: ArrowVector{T} data::MissingVector - metadata::Union{Nothing, Dict{String, String}} + metadata::Union{Nothing, Base.ImmutableDict{String, String}} end Base.size(v::NullVector) = (length(v.data),) Base.getindex(v::NullVector{T}, i::Int) where {T} = ArrowTypes.fromarrow(T, getindex(v.data, i)) -arrowvector(::NullKind, x, i, nl, fi, de, ded, meta; kw...) = NullVector{eltype(x)}(MissingVector(length(x)), meta) +arrowvector(::NullKind, x, i, nl, fi, de, ded, meta; kw...) = NullVector{eltype(x)}(MissingVector(length(x)), isnothing(meta) ? nothing : toidict(meta)) compress(Z::Meta.CompressionType, comp, v::NullVector) = Compressed{Z, NullVector}(v, CompressedBuffer[], length(v), length(v), Compressed[]) diff --git a/src/arraytypes/bool.jl b/src/arraytypes/bool.jl index 021803d0..c11ceb69 100644 --- a/src/arraytypes/bool.jl +++ b/src/arraytypes/bool.jl @@ -25,7 +25,7 @@ struct BoolVector{T} <: ArrowVector{T} pos::Int validity::ValidityBitmap ℓ::Int64 - metadata::Union{Nothing, Dict{String, String}} + metadata::Union{Nothing, Base.ImmutableDict{String, String}} end Base.size(p::BoolVector) = (p.ℓ,) diff --git a/src/arraytypes/dictencoding.jl b/src/arraytypes/dictencoding.jl index 62eb0f62..505e2753 100644 --- a/src/arraytypes/dictencoding.jl +++ b/src/arraytypes/dictencoding.jl @@ -35,7 +35,7 @@ mutable struct DictEncoding{T, S, A} <: ArrowVector{T} id::Int64 data::A isOrdered::Bool - metadata::Union{Nothing, Dict{String, String}} + metadata::Union{Nothing, Base.ImmutableDict{String, String}} end indextype(::Type{DictEncoding{T, S, A}}) where {T, S, A} = S @@ -91,7 +91,7 @@ struct DictEncoded{T, S, A} <: ArrowVector{T} validity::ValidityBitmap indices::Vector{S} encoding::DictEncoding{T, S, A} - metadata::Union{Nothing, Dict{String, String}} + metadata::Union{Nothing, Base.ImmutableDict{String, String}} end DictEncoded(b::Vector{UInt8}, v::ValidityBitmap, inds::Vector{S}, encoding::DictEncoding{T, S, A}, meta) where {S, T, A} = @@ -229,7 +229,7 @@ function arrowvector(::DictEncodedKind, x, i, nl, fi, de, ded, meta; dictencode: end end if meta !== nothing && getmetadata(encoding) !== nothing - merge!(meta, getmetadata(encoding)) + meta = toidict(merge!(Dict(meta), Dict(getmetadata(encoding)))) elseif getmetadata(encoding) !== nothing meta = getmetadata(encoding) end diff --git a/src/arraytypes/fixedsizelist.jl b/src/arraytypes/fixedsizelist.jl index 0d30c27c..98e5f4b8 100644 --- a/src/arraytypes/fixedsizelist.jl +++ b/src/arraytypes/fixedsizelist.jl @@ -24,7 +24,7 @@ struct FixedSizeList{T, A <: AbstractVector} <: ArrowVector{T} validity::ValidityBitmap data::A ℓ::Int - metadata::Union{Nothing, Dict{String, String}} + metadata::Union{Nothing, Base.ImmutableDict{String,String}} end Base.size(l::FixedSizeList) = (l.ℓ,) diff --git a/src/arraytypes/list.jl b/src/arraytypes/list.jl index e5dd12df..1525f380 100644 --- a/src/arraytypes/list.jl +++ b/src/arraytypes/list.jl @@ -39,7 +39,7 @@ struct List{T, O, A} <: ArrowVector{T} offsets::Offsets{O} data::A ℓ::Int - metadata::Union{Nothing, Dict{String, String}} + metadata::Union{Nothing, Base.ImmutableDict{String,String}} end Base.size(l::List) = (l.ℓ,) diff --git a/src/arraytypes/map.jl b/src/arraytypes/map.jl index 0cd2e3d2..3664f7d6 100644 --- a/src/arraytypes/map.jl +++ b/src/arraytypes/map.jl @@ -24,7 +24,7 @@ struct Map{T, O, A} <: ArrowVector{T} offsets::Offsets{O} data::A ℓ::Int - metadata::Union{Nothing, Dict{String, String}} + metadata::Union{Nothing, Base.ImmutableDict{String,String}} end Base.size(l::Map) = (l.ℓ,) diff --git a/src/arraytypes/primitive.jl b/src/arraytypes/primitive.jl index 9ef29b1a..3612b430 100644 --- a/src/arraytypes/primitive.jl +++ b/src/arraytypes/primitive.jl @@ -24,7 +24,7 @@ struct Primitive{T, A} <: ArrowVector{T} validity::ValidityBitmap data::A ℓ::Int64 - metadata::Union{Nothing, Dict{String, String}} + metadata::Union{Nothing, Base.ImmutableDict{String,String}} end Primitive(::Type{T}, b::Vector{UInt8}, v::ValidityBitmap, data::A, l, meta) where {T, A} = diff --git a/src/arraytypes/struct.jl b/src/arraytypes/struct.jl index 3f383bc0..e32a0168 100644 --- a/src/arraytypes/struct.jl +++ b/src/arraytypes/struct.jl @@ -23,7 +23,7 @@ struct Struct{T, S} <: ArrowVector{T} validity::ValidityBitmap data::S # Tuple of ArrowVector ℓ::Int - metadata::Union{Nothing, Dict{String, String}} + metadata::Union{Nothing, Base.ImmutableDict{String,String}} end Base.size(s::Struct) = (s.ℓ,) diff --git a/src/arraytypes/unions.jl b/src/arraytypes/unions.jl index ff929e15..045d7ad7 100644 --- a/src/arraytypes/unions.jl +++ b/src/arraytypes/unions.jl @@ -63,7 +63,7 @@ struct DenseUnion{T, U, S} <: ArrowVector{T} typeIds::Vector{UInt8} offsets::Vector{Int32} data::S # Tuple of ArrowVector - metadata::Union{Nothing, Dict{String, String}} + metadata::Union{Nothing, Base.ImmutableDict{String,String}} end Base.size(s::DenseUnion) = size(s.typeIds) @@ -185,7 +185,7 @@ struct SparseUnion{T, U, S} <: ArrowVector{T} arrow::Vector{UInt8} # need to hold a reference to arrow memory blob typeIds::Vector{UInt8} data::S # Tuple of ArrowVector - metadata::Union{Nothing, Dict{String, String}} + metadata::Union{Nothing, Base.ImmutableDict{String,String}} end Base.size(s::SparseUnion) = size(s.typeIds) diff --git a/src/eltypes.jl b/src/eltypes.jl index c5aa2f7a..0aa6d0bf 100644 --- a/src/eltypes.jl +++ b/src/eltypes.jl @@ -39,7 +39,7 @@ function juliaeltype(f::Meta.Field, ::Nothing, convert::Bool) return convert ? finaljuliatype(T) : T end -function juliaeltype(f::Meta.Field, meta::Dict{String, String}, convert::Bool) +function juliaeltype(f::Meta.Field, meta::AbstractDict{String, String}, convert::Bool) TT = juliaeltype(f, convert) !convert && return TT T = finaljuliatype(TT) diff --git a/src/table.jl b/src/table.jl index d6c71f7b..012f9153 100644 --- a/src/table.jl +++ b/src/table.jl @@ -182,19 +182,37 @@ struct Table <: Tables.AbstractColumns columns::Vector{AbstractVector} lookup::Dict{Symbol, AbstractVector} schema::Ref{Meta.Schema} - metadata::Ref{Dict{String, String}} + metadata::Ref{Union{Nothing,Base.ImmutableDict{String,String}}} end -Table() = Table(Symbol[], Type[], AbstractVector[], Dict{Symbol, AbstractVector}(), Ref{Meta.Schema}(), Ref{Dict{String, String}}()) -Table(names, types, columns, lookup, schema) = Table(names, types, columns, lookup, schema, Ref{Dict{String, String}}()) +Table() = Table(Symbol[], Type[], AbstractVector[], Dict{Symbol, AbstractVector}(), Ref{Meta.Schema}(), Ref{Union{Nothing,Base.ImmutableDict{String,String}}}(nothing)) + +function Table(names, types, columns, lookup, schema) + m = isassigned(schema) ? buildmetadata(schema[]) : nothing + return Table(names, types, columns, lookup, schema, Ref{Union{Nothing,Base.ImmutableDict{String,String}}}(m)) +end names(t::Table) = getfield(t, :names) types(t::Table) = getfield(t, :types) columns(t::Table) = getfield(t, :columns) lookup(t::Table) = getfield(t, :lookup) schema(t::Table) = getfield(t, :schema) -getmetadata(t::Table) = isdefined(getfield(t, :metadata), :x) ? getfield(t, :metadata)[] : nothing -setmetadata!(t::Table, m::Dict{String, String}) = (setindex!(getfield(t, :metadata), m); nothing) + +""" + Arrow.getmetadata(x) + +If `x isa Arrow.Table` return a `Base.ImmutableDict{String,String}` representation of `x`'s +`Schema` `custom_metadata`, or `nothing` if no such metadata exists. + +If `x isa Arrow.ArrowVector`, return a `Base.ImmutableDict{String,String}` representation of `x`'s +`Field` `custom_metadata`, or `nothing` if no such metadata exists. + +Otherwise, return `nothing`. + +See [the official Arrow documentation for more details on custom application metadata](https://arrow.apache.org/docs/format/Columnar.html#custom-application-metadata). +""" +getmetadata(t::Table) = getfield(t, :metadata)[] +getmetadata(::Any) = nothing Tables.istable(::Table) = true Tables.columnaccess(::Table) = true @@ -306,10 +324,7 @@ function Table(bytes::Vector{UInt8}, off::Integer=1, tlen::Union{Integer, Nothin lu[nm] = col push!(ty, eltype(col)) end - meta = sch !== nothing ? sch.custom_metadata : nothing - if meta !== nothing - getfield(t, :metadata)[] = buildmetadata(meta) - end + getfield(t, :metadata)[] = buildmetadata(sch) return t end @@ -366,10 +381,10 @@ struct VectorIterator convert::Bool end -buildmetadata(f::Meta.Field) = buildmetadata(f.custom_metadata) -buildmetadata(meta) = Dict(String(kv.key) => String(kv.value) for kv in meta) +buildmetadata(f::Union{Meta.Field,Meta.Schema}) = buildmetadata(f.custom_metadata) +buildmetadata(meta) = toidict(String(kv.key) => String(kv.value) for kv in meta) buildmetadata(::Nothing) = nothing -buildmetadata(x::Dict{String, String}) = x +buildmetadata(x::AbstractDict) = x function Base.iterate(x::VectorIterator, (columnidx, nodeidx, bufferidx)=(Int64(1), Int64(1), Int64(1))) columnidx > length(x.schema.fields) && return nothing diff --git a/src/utils.jl b/src/utils.jl index 235efc56..f5c052a3 100644 --- a/src/utils.jl +++ b/src/utils.jl @@ -207,3 +207,14 @@ function tobuffer(data; kwargs...) seekstart(io) return io end + +toidict(x::Base.ImmutableDict) = x + +# ref https://github.com/JuliaData/Arrow.jl/pull/238#issuecomment-919415809 +function toidict(pairs) + dict = Base.ImmutableDict(first(pairs)) + for pair in Iterators.drop(pairs, 1) + dict = Base.ImmutableDict(dict, pair) + end + return dict +end \ No newline at end of file diff --git a/src/write.jl b/src/write.jl index 032238c3..e9d7d1d7 100644 --- a/src/write.jl +++ b/src/write.jl @@ -14,39 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -const OBJ_METADATA_LOCK = Ref{ReentrantLock}() -const OBJ_METADATA = IdDict{Any, Dict{String, String}}() - -""" - Arrow.setmetadata!(x, metadata::Dict{String, String}) - -Set the metadata for any object, provided as a `Dict{String, String}`. -Metadata attached to a table or column will be serialized when written -as a stream or file. -""" -function setmetadata!(x, meta::Dict{String, String}) - lock(OBJ_METADATA_LOCK[]) do - OBJ_METADATA[x] = meta - end - return -end - -""" - Arrow.getmetadata(x) => Dict{String, String} - -Retrieve any metadata (as a `Dict{String, String}`) attached to `x`. - -Metadata may be attached to any object via [`Arrow.setmetadata!`](@ref), -or deserialized via the arrow format directly (the format allows attaching metadata -to table, column, and other objects). - -Note that this function's return value directly aliases `x`'s attached metadata -(i.e. is not a copy of the underlying storage). Any method author that overloads -this function should preserve this behavior so that downstream callers can rely -on this behavior in generic code. -""" -getmetadata(x, default=nothing) = lock(() -> get(OBJ_METADATA, x, default), OBJ_METADATA_LOCK[]) - const DEFAULT_MAX_DEPTH = 6 """ @@ -70,6 +37,7 @@ By default, `Arrow.write` will use multiple threads to write multiple record batches simultaneously (e.g. if julia is started with `julia -t 8` or the `JULIA_NUM_THREADS` environment variable is set). Supported keyword arguments to `Arrow.write` include: + * `colmetadata=nothing`: the metadata that should be written as the table's columns' `custom_metadata` fields; must either be `nothing` or an `AbstractDict` of `column_name::Symbol => column_metadata` where `column_metadata` is an iterable of `<:AbstractString` pairs. * `compress`: possible values include `:lz4`, `:zstd`, or your own initialized `LZ4FrameCompressor` or `ZstdCompressor` objects; will cause all buffers in each record batch to use the respective compression encoding * `alignment::Int=8`: specify the number of bytes to align buffers to when written in messages; strongly recommended to only use alignment values of 8 or 64 for modern memory cache line optimization * `dictencode::Bool=false`: whether all columns should use dictionary encoding when being written; to dict encode specific columns, wrap the column/array in `Arrow.DictEncode(col)` @@ -77,6 +45,7 @@ Supported keyword arguments to `Arrow.write` include: * `denseunions::Bool=true`: whether Julia `Vector{<:Union}` arrays should be written using the dense union layout; passing `false` will result in the sparse union layout * `largelists::Bool=false`: causes list column types to be written with Int64 offset arrays; mainly for testing purposes; by default, Int64 offsets will be used only if needed * `maxdepth::Int=$DEFAULT_MAX_DEPTH`: deepest allowed nested serialization level; this is provided by default to prevent accidental infinite recursion with mutually recursive data structures + * `metadata=Arrow.getmetadata(tbl)`: the metadata that should be written as the table's schema's `custom_metadata` field; must either be `nothing` or an iterable of `<:AbstractString` pairs. * `ntasks::Int`: number of concurrent threaded tasks to allow while writing input partitions out as arrow record batches; default is no limit; to disable multithreaded writing, pass `ntasks=1` * `file::Bool=false`: if a an `io` argument is being written to, passing `file=true` will cause the arrow file format to be written instead of just IPC streaming """ @@ -84,18 +53,18 @@ function write end write(io_or_file; kw...) = x -> write(io_or_file, x; kw...) -function write(filename::String, tbl; largelists::Bool=false, compress::Union{Nothing, Symbol, LZ4FrameCompressor, ZstdCompressor}=nothing, denseunions::Bool=true, dictencode::Bool=false, dictencodenested::Bool=false, alignment::Int=8, maxdepth::Int=DEFAULT_MAX_DEPTH, ntasks=Inf, file::Bool=true) +function write(filename::String, tbl; metadata=getmetadata(tbl), colmetadata=nothing, largelists::Bool=false, compress::Union{Nothing, Symbol, LZ4FrameCompressor, ZstdCompressor}=nothing, denseunions::Bool=true, dictencode::Bool=false, dictencodenested::Bool=false, alignment::Int=8, maxdepth::Int=DEFAULT_MAX_DEPTH, ntasks=Inf, file::Bool=true) open(filename, "w") do io - write(io, tbl, file, largelists, compress, denseunions, dictencode, dictencodenested, alignment, maxdepth, ntasks) + write(io, tbl, file, largelists, compress, denseunions, dictencode, dictencodenested, alignment, maxdepth, ntasks, metadata, colmetadata) end return filename end -function write(io::IO, tbl; largelists::Bool=false, compress::Union{Nothing, Symbol, LZ4FrameCompressor, ZstdCompressor}=nothing, denseunions::Bool=true, dictencode::Bool=false, dictencodenested::Bool=false, alignment::Int=8, maxdepth::Int=DEFAULT_MAX_DEPTH, ntasks=Inf, file::Bool=false) - return write(io, tbl, file, largelists, compress, denseunions, dictencode, dictencodenested, alignment, maxdepth, ntasks) +function write(io::IO, tbl; metadata=getmetadata(tbl), colmetadata=nothing, largelists::Bool=false, compress::Union{Nothing, Symbol, LZ4FrameCompressor, ZstdCompressor}=nothing, denseunions::Bool=true, dictencode::Bool=false, dictencodenested::Bool=false, alignment::Int=8, maxdepth::Int=DEFAULT_MAX_DEPTH, ntasks=Inf, file::Bool=false) + return write(io, tbl, file, largelists, compress, denseunions, dictencode, dictencodenested, alignment, maxdepth, ntasks, metadata, colmetadata) end -function write(io, source, writetofile, largelists, compress, denseunions, dictencode, dictencodenested, alignment, maxdepth, ntasks) +function write(io, source, writetofile, largelists, compress, denseunions, dictencode, dictencodenested, alignment, maxdepth, ntasks, meta, colmeta) if ntasks < 1 throw(ArgumentError("ntasks keyword argument must be > 0; pass `ntasks=1` to disable multithreaded writing")) end @@ -137,7 +106,7 @@ function write(io, source, writetofile, largelists, compress, denseunions, dicte @debug 1 "processing table partition i = $i" tblcols = Tables.columns(tbl) if i == 1 - cols = toarrowtable(tblcols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth) + cols = toarrowtable(tblcols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, meta, colmeta) sch[] = Tables.schema(cols) firstcols[] = cols put!(msgs, makeschemamsg(sch[], cols), i) @@ -153,9 +122,9 @@ function write(io, source, writetofile, largelists, compress, denseunions, dicte put!(msgs, makerecordbatchmsg(sch[], cols, alignment), i, true) else if threaded - Threads.@spawn process_partition(tblcols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror) + Threads.@spawn process_partition(tblcols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror, meta, colmeta) else - @async process_partition(tblcols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror) + @async process_partition(tblcols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror, meta, colmeta) end end end @@ -209,9 +178,9 @@ function write(io, source, writetofile, largelists, compress, denseunions, dicte return io end -function process_partition(cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror) +function process_partition(cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, msgs, alignment, i, sch, errorref, anyerror, meta, colmeta) try - cols = toarrowtable(cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth) + cols = toarrowtable(cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, meta, colmeta) if !isempty(cols.dictencodingdeltas) for de in cols.dictencodingdeltas dictsch = Tables.Schema((:col,), (eltype(de.data),)) @@ -229,13 +198,12 @@ end struct ToArrowTable sch::Tables.Schema cols::Vector{Any} - metadata::Union{Nothing, Dict{String, String}} + metadata::Union{Nothing,Base.ImmutableDict{String,String}} dictencodingdeltas::Vector{DictEncoding} end -function toarrowtable(cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth) +function toarrowtable(cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, meta, colmeta) @debug 1 "converting input table to arrow formatted columns" - meta = getmetadata(cols) sch = Tables.schema(cols) types = collect(sch.types) N = length(types) @@ -243,12 +211,15 @@ function toarrowtable(cols, dictencodings, largelists, compress, denseunions, di newtypes = Vector{Type}(undef, N) dictencodingdeltas = DictEncoding[] Tables.eachcolumn(sch, cols) do col, i, nm - newcol = toarrowvector(col, i, dictencodings, dictencodingdeltas; compression=compress, largelists=largelists, denseunions=denseunions, dictencode=dictencode, dictencodenested=dictencodenested, maxdepth=maxdepth) + oldcolmeta = getmetadata(col) + newcolmeta = isnothing(colmeta) ? oldcolmeta : get(colmeta, nm, oldcolmeta) + newcol = toarrowvector(col, i, dictencodings, dictencodingdeltas, newcolmeta; compression=compress, largelists=largelists, denseunions=denseunions, dictencode=dictencode, dictencodenested=dictencodenested, maxdepth=maxdepth) newtypes[i] = eltype(newcol) newcols[i] = newcol end minlen, maxlen = isempty(newcols) ? (0, 0) : extrema(length, newcols) minlen == maxlen || throw(ArgumentError("columns with unequal lengths detected: $minlen < $maxlen")) + meta = _normalizemeta(meta) return ToArrowTable(Tables.Schema(sch.names, newtypes), newcols, meta, dictencodingdeltas) end diff --git a/test/runtests.jl b/test/runtests.jl index 162c8682..434a8cfc 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -120,15 +120,24 @@ tt = Arrow.Table(Arrow.tobuffer(tt; dictencode=true, dictencodenested=true)) t = (col1=Int64[1,2,3,4,5,6,7,8,9,10],) meta = Dict("key1" => "value1", "key2" => "value2") -Arrow.setmetadata!(t, meta) meta2 = Dict("colkey1" => "colvalue1", "colkey2" => "colvalue2") -Arrow.setmetadata!(t.col1, meta2) -tt = Arrow.Table(Arrow.tobuffer(t)) +tt = Arrow.Table(Arrow.tobuffer(t; colmetadata=Dict(:col1 => meta2), metadata=meta)) @test length(tt) == length(t) @test tt.col1 == t.col1 @test eltype(tt.col1) === Int64 -@test Arrow.getmetadata(tt) == meta -@test Arrow.getmetadata(tt.col1) == meta2 +@test Arrow.getmetadata(tt) == Arrow.toidict(meta) +@test Arrow.getmetadata(tt.col1) == Arrow.toidict(meta2) + +t = (col1=collect(1:10), col2=collect('a':'j'), col3=collect(1:10)) +meta = ("key1" => :value1, :key2 => "value2") +meta2 = ("colkey1" => :colvalue1, :colkey2 => "colvalue2") +meta3 = ("colkey3" => :colvalue3,) +tt = Arrow.Table(Arrow.tobuffer(t; colmetadata=Dict(:col2 => meta2, :col3 => meta3), metadata=meta)) +@test Arrow.getmetadata(tt) == Arrow.toidict(String(k) => String(v) for (k, v) in meta) +@test Arrow.getmetadata(tt.col1) === nothing +@test Arrow.getmetadata(tt.col2)["colkey1"] == "colvalue1" +@test Arrow.getmetadata(tt.col2)["colkey2"] == "colvalue2" +@test Arrow.getmetadata(tt.col3)["colkey3"] == "colvalue3" # custom compressors lz4 = Arrow.CodecLz4.LZ4FrameCompressor(; compressionlevel=8) @@ -296,12 +305,6 @@ ArrowTypes.JuliaType(::Val{:CustomStruct2}, S, meta) = CustomStruct2{Symbol(meta tbl = Arrow.Table(Arrow.tobuffer(t)) @test eltype(tbl.col1) == CustomStruct2{:hey} -# 170 -tbl = Arrow.Table(Arrow.tobuffer((x = [1,2,3],))) -m = Dict("a" => "b") -Arrow.setmetadata!(tbl, m) -@test Arrow.getmetadata(tbl) === m - # 166 t = ( col1=[zero(Arrow.Timestamp{Arrow.Meta.TimeUnit.NANOSECOND, nothing})], @@ -373,7 +376,7 @@ end # 2-arg show with metadata big_dict = Dict((randstring(rand(5:10)) => randstring(rand(1:3)) for _ = 1:100)) - Arrow.setmetadata!(arrow_table, big_dict) + arrow_table = Arrow.Table(Arrow.tobuffer(table; metadata=big_dict)) str2 = sprint(show, arrow_table) @test length(str2) > length(str) @test length(str2) < 200