Skip to content
This repository has been archived by the owner on Jul 17, 2019. It is now read-only.

Move data tables dependencies to respective packages #28

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions REQUIRE
Original file line number Diff line number Diff line change
@@ -1,5 +1 @@
julia 0.5
DataFrames
NullableArrays 0.0.9
CategoricalArrays 0.0.5
WeakRefStrings 0.1.3
3 changes: 3 additions & 0 deletions docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ Packages can have a single julia type implement both the `Data.Source` and `Data
* [`CSV.Source`](https://github.com/JuliaData/CSV.jl/blob/master/src/Source.jl)
* [`SQLite.Source`](https://github.com/JuliaDB/SQLite.jl/blob/master/src/Source.jl)
* [`DataFrame`](https://github.com/JuliaData/DataStreams.jl/blob/master/src/DataStreams.jl#L251)
* [`DataTables`](https://github.com/JuliaData/DataStreams.jl/blob/master/src/DataStreams.jl#L251)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These will be wrong until we have a new url to link to

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use a less specific link when pointing to a different repository, else it's going to be outdated quite fast. Just point to the package homepage? Or maybe to the file?

* [`ODBC.Source`](https://github.com/JuliaDB/ODBC.jl/blob/master/src/Source.jl)

`Data.Sink` implementations:
* [`CSV.Sink`](https://github.com/JuliaData/CSV.jl/blob/master/src/Sink.jl)
* [`SQLite.Sink`](https://github.com/JuliaDB/SQLite.jl/blob/master/src/Sink.jl)
* [`DataFrame`](https://github.com/JuliaData/DataStreams.jl/blob/master/src/DataStreams.jl#L287)
* [`DataTables`](https://github.com/JuliaData/DataStreams.jl/blob/master/src/DataStreams.jl#L287)
* [`DataTable`](https://github.com/JuliaData/DataStreams.jl/blob/master/src/DataStreams.jl#L287)
* [`ODBC.Sink`](https://github.com/JuliaDB/ODBC.jl/blob/master/src/Sink.jl)

## `Data.Source` Interface
Expand Down
116 changes: 1 addition & 115 deletions src/DataStreams.jl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__precompile__(true)
module DataStreams

export Data, DataFrame
export Data

module Data

Expand Down Expand Up @@ -242,120 +242,6 @@ function Data.stream!{T1, T2}(source::T1, ::Type{Data.Column}, sink::T2, source_
return sink
end

# DataFrames DataStreams definitions
using DataFrames, NullableArrays, CategoricalArrays, WeakRefStrings

# DataFrames DataStreams implementation
function Data.schema(df::DataFrame, ::Type{Data.Column})
return Data.Schema(map(string, names(df)),
DataType[typeof(A) for A in df.columns], size(df, 1))
end

# DataFrame as a Data.Source
function Data.isdone(source::DataFrame, row, col)
rows, cols = size(source)
return row > rows || col > cols
end

Data.streamtype(::Type{DataFrame}, ::Type{Data.Column}) = true
Data.streamtype(::Type{DataFrame}, ::Type{Data.Field}) = true

Data.streamfrom{T <: AbstractVector}(source::DataFrame, ::Type{Data.Column}, ::Type{T}, col) = (@inbounds A = source.columns[col]::T; return A)
Data.streamfrom{T}(source::DataFrame, ::Type{Data.Column}, ::Type{T}, col) = (@inbounds A = source.columns[col]; return A)
Data.streamfrom{T}(source::DataFrame, ::Type{Data.Field}, ::Type{T}, row, col) = (@inbounds A = Data.streamfrom(source, Data.Column, T, col); return A[row]::T)

# DataFrame as a Data.Sink
allocate{T}(::Type{T}, rows, ref) = Array{T}(rows)
allocate{T}(::Type{Vector{T}}, rows, ref) = Array{T}(rows)

allocate{T}(::Type{Nullable{T}}, rows, ref) = NullableArray{T, 1}(Array{T}(rows), fill(true, rows), isempty(ref) ? UInt8[] : ref)
allocate{T}(::Type{NullableVector{T}}, rows, ref) = NullableArray{T, 1}(Array{T}(rows), fill(true, rows), isempty(ref) ? UInt8[] : ref)

allocate{S,R}(::Type{CategoricalArrays.CategoricalValue{S,R}}, rows, ref) = CategoricalArray{S,1,R}(rows)
allocate{S,R}(::Type{CategoricalVector{S,R}}, rows, ref) = CategoricalArray{S,1,R}(rows)

allocate{S,R}(::Type{Nullable{CategoricalArrays.CategoricalValue{S,R}}}, rows, ref) = NullableCategoricalArray{S,1,R}(rows)
allocate{S,R}(::Type{NullableCategoricalVector{S,R}}, rows, ref) = NullableCategoricalArray{S,1,R}(rows)

if isdefined(Main, :DataArray)
allocate{T}(::Type{DataVector{T}}, rows, ref) = DataArray{T}(rows)
end

function DataFrame{T <: Data.StreamType}(sch::Data.Schema, ::Type{T}=Data.Field, append::Bool=false, ref::Vector{UInt8}=UInt8[], args...)
rows, cols = size(sch)
rows = max(0, T <: Data.Column ? 0 : rows) # don't pre-allocate for Column streaming
columns = Vector{Any}(cols)
types = Data.types(sch)
for i = 1:cols
columns[i] = allocate(types[i], rows, ref)
end
return DataFrame(columns, map(Symbol, Data.header(sch)))
end

# given an existing DataFrame (`sink`), make any necessary changes for streaming source
# with Data.Schema `sch` to it, given we know if we'll be `appending` or not
function DataFrame(sink, sch::Data.Schema, ::Type{Field}, append::Bool, ref::Vector{UInt8})
rows, cols = size(sch)
newsize = max(0, rows) + (append ? size(sink, 1) : 0)
# need to make sure we don't break a NullableVector{WeakRefString{UInt8}} when appending
if append
for (i, T) in enumerate(Data.types(sch))
if T <: Nullable{WeakRefString{UInt8}}
sink.columns[i] = NullableArray(String[string(get(x, "")) for x in sink.columns[i]])
sch.types[i] = Nullable{String}
end
end
end
newsize != size(sink, 1) && foreach(x->resize!(x, newsize), sink.columns)
sch.rows = newsize
return sink
end
function DataFrame(sink, sch::Schema, ::Type{Column}, append::Bool, ref::Vector{UInt8})
rows, cols = size(sch)
append ? (sch.rows += size(sink, 1)) : foreach(empty!, sink.columns)
return sink
end

Data.streamtypes(::Type{DataFrame}) = [Data.Column, Data.Field]

Data.streamto!{T}(sink::DataFrame, ::Type{Data.Field}, val::T, row, col, sch::Data.Schema{false}) = push!(sink.columns[col]::Vector{T}, val)
Data.streamto!{T}(sink::DataFrame, ::Type{Data.Field}, val::Nullable{T}, row, col, sch::Data.Schema{false}) = push!(sink.columns[col]::NullableVector{T}, val)
Data.streamto!{T, R}(sink::DataFrame, ::Type{Data.Field}, val::CategoricalValue{T, R}, row, col, sch::Data.Schema{false}) = push!(sink.columns[col]::CategoricalVector{T, R}, val)
Data.streamto!{T, R}(sink::DataFrame, ::Type{Data.Field}, val::Nullable{CategoricalValue{T, R}}, row, col, sch::Data.Schema{false}) = push!(sink.columns[col]::NullableCategoricalVector{T, R}, val)

Data.streamto!{T}(sink::DataFrame, ::Type{Data.Field}, val::T, row, col, sch::Data.Schema{true}) = (sink.columns[col]::Vector{T})[row] = val
Data.streamto!{T}(sink::DataFrame, ::Type{Data.Field}, val::Nullable{T}, row, col, sch::Data.Schema{true}) = (sink.columns[col]::NullableVector{T})[row] = val
Data.streamto!(sink::DataFrame, ::Type{Data.Field}, val::Nullable{WeakRefString{UInt8}}, row, col, sch::Data.Schema{true}) = (sink.columns[col][row] = val)
Data.streamto!{T, R}(sink::DataFrame, ::Type{Data.Field}, val::CategoricalValue{T, R}, row, col, sch::Data.Schema{true}) = (sink.columns[col]::CategoricalVector{T, R})[row] = val
Data.streamto!{T, R}(sink::DataFrame, ::Type{Data.Field}, val::Nullable{CategoricalValue{T, R}}, row, col, sch::Data.Schema{true}) = (sink.columns[col]::NullableCategoricalVector{T, R})[row] = val

function Data.streamto!{T}(sink::DataFrame, ::Type{Data.Column}, column::T, row, col, sch::Data.Schema)
if row == 0
sink.columns[col] = column
else
append!(sink.columns[col]::T, column)
end
return length(column)
end

function Base.append!{T}(dest::NullableVector{WeakRefString{T}}, column::NullableVector{WeakRefString{T}})
offset = length(dest.values)
parentoffset = length(dest.parent)
append!(dest.isnull, column.isnull)
append!(dest.parent, column.parent)
# appending new data to `dest` would invalid all existing WeakRefString pointers
resize!(dest.values, length(dest) + length(column))
for i = 1:offset
old = dest.values[i]
dest.values[i] = WeakRefString{T}(pointer(dest.parent, old.ind), old.len, old.ind)
end
for i = 1:length(column)
old = column.values[i]
dest.values[offset + i] = WeakRefString{T}(pointer(dest.parent, parentoffset + old.ind), old.len, parentoffset + old.ind)
end
return length(dest)
end

end # module Data

end # module DataStreams
4 changes: 4 additions & 0 deletions test/REQUIRE
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
DataTables
NullableArrays 0.0.9
CategoricalArrays 0.0.5
WeakRefStrings 0.1.3
6 changes: 3 additions & 3 deletions test/runtests.jl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using Base.Test, DataStreams, DataFrames, NullableArrays
using Base.Test, DataStreams, DataTables, NullableArrays

if !isdefined(Core, :String)
typealias String UTF8String
Expand Down Expand Up @@ -33,12 +33,12 @@ TableSink(s::Data.Source) = TableSink(schema(s))

import DataStreams.Data.stream!

function Data.stream!(src::DataFrame, snk::TableSink)
function Data.stream!(src::DataTable, snk::TableSink)
# TODO: this could be improved considering different source Schema.
snk.data = src.columns
end

src_tb = DataFrame()
src_tb = DataTable()
snk = TableSink(sch)

snk_tb = Data.stream!(src_tb, snk)
Expand Down