Skip to content
This repository has been archived by the owner on Jul 17, 2019. It is now read-only.

Move data tables dependencies to respective packages #28

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions REQUIRE
Original file line number Diff line number Diff line change
@@ -1,5 +1 @@
julia 0.5
DataFrames
NullableArrays 0.0.9
CategoricalArrays 0.0.5
WeakRefStrings 0.1.3
6 changes: 4 additions & 2 deletions docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ Packages can have a single julia type implement both the `Data.Source` and `Data
`Data.Source` implementations:
* [`CSV.Source`](https://github.com/JuliaData/CSV.jl/blob/master/src/Source.jl)
* [`SQLite.Source`](https://github.com/JuliaDB/SQLite.jl/blob/master/src/Source.jl)
* [`DataFrame`](https://github.com/JuliaData/DataStreams.jl/blob/master/src/DataStreams.jl#L251)
* [`DataFrame`](https://github.com/JuliaStats/DataFrames.jl/blob/master/src/abstractdataframe/io.jl)
* [`DataTable`](https://github.com/JuliaData/DataTables.jl/blob/master/src/abstractdatatable/io.jl)
* [`ODBC.Source`](https://github.com/JuliaDB/ODBC.jl/blob/master/src/Source.jl)

`Data.Sink` implementations:
* [`CSV.Sink`](https://github.com/JuliaData/CSV.jl/blob/master/src/Sink.jl)
* [`SQLite.Sink`](https://github.com/JuliaDB/SQLite.jl/blob/master/src/Sink.jl)
* [`DataFrame`](https://github.com/JuliaData/DataStreams.jl/blob/master/src/DataStreams.jl#L287)
* [`DataFrame`](https://github.com/JuliaStats/DataFrames.jl/blob/master/src/abstractdataframe/io.jl)
* [`DataTable`](https://github.com/JuliaData/DataTables.jl/blob/master/src/abstractdatatable/io.jl)
* [`ODBC.Sink`](https://github.com/JuliaDB/ODBC.jl/blob/master/src/Sink.jl)

## `Data.Source` Interface
Expand Down
116 changes: 1 addition & 115 deletions src/DataStreams.jl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__precompile__(true)
module DataStreams

export Data, DataFrame
export Data

module Data

Expand Down Expand Up @@ -242,120 +242,6 @@ function Data.stream!{T1, T2}(source::T1, ::Type{Data.Column}, sink::T2, source_
return sink
end

# DataFrames DataStreams definitions
using DataFrames, NullableArrays, CategoricalArrays, WeakRefStrings

# DataFrames DataStreams implementation
function Data.schema(df::DataFrame, ::Type{Data.Column})
return Data.Schema(map(string, names(df)),
DataType[typeof(A) for A in df.columns], size(df, 1))
end

# DataFrame as a Data.Source
function Data.isdone(source::DataFrame, row, col)
rows, cols = size(source)
return row > rows || col > cols
end

Data.streamtype(::Type{DataFrame}, ::Type{Data.Column}) = true
Data.streamtype(::Type{DataFrame}, ::Type{Data.Field}) = true

Data.streamfrom{T <: AbstractVector}(source::DataFrame, ::Type{Data.Column}, ::Type{T}, col) = (@inbounds A = source.columns[col]::T; return A)
Data.streamfrom{T}(source::DataFrame, ::Type{Data.Column}, ::Type{T}, col) = (@inbounds A = source.columns[col]; return A)
Data.streamfrom{T}(source::DataFrame, ::Type{Data.Field}, ::Type{T}, row, col) = (@inbounds A = Data.streamfrom(source, Data.Column, T, col); return A[row]::T)

# DataFrame as a Data.Sink
allocate{T}(::Type{T}, rows, ref) = Array{T}(rows)
allocate{T}(::Type{Vector{T}}, rows, ref) = Array{T}(rows)

allocate{T}(::Type{Nullable{T}}, rows, ref) = NullableArray{T, 1}(Array{T}(rows), fill(true, rows), isempty(ref) ? UInt8[] : ref)
allocate{T}(::Type{NullableVector{T}}, rows, ref) = NullableArray{T, 1}(Array{T}(rows), fill(true, rows), isempty(ref) ? UInt8[] : ref)

allocate{S,R}(::Type{CategoricalArrays.CategoricalValue{S,R}}, rows, ref) = CategoricalArray{S,1,R}(rows)
allocate{S,R}(::Type{CategoricalVector{S,R}}, rows, ref) = CategoricalArray{S,1,R}(rows)

allocate{S,R}(::Type{Nullable{CategoricalArrays.CategoricalValue{S,R}}}, rows, ref) = NullableCategoricalArray{S,1,R}(rows)
allocate{S,R}(::Type{NullableCategoricalVector{S,R}}, rows, ref) = NullableCategoricalArray{S,1,R}(rows)

if isdefined(Main, :DataArray)
allocate{T}(::Type{DataVector{T}}, rows, ref) = DataArray{T}(rows)
end

function DataFrame{T <: Data.StreamType}(sch::Data.Schema, ::Type{T}=Data.Field, append::Bool=false, ref::Vector{UInt8}=UInt8[], args...)
rows, cols = size(sch)
rows = max(0, T <: Data.Column ? 0 : rows) # don't pre-allocate for Column streaming
columns = Vector{Any}(cols)
types = Data.types(sch)
for i = 1:cols
columns[i] = allocate(types[i], rows, ref)
end
return DataFrame(columns, map(Symbol, Data.header(sch)))
end

# given an existing DataFrame (`sink`), make any necessary changes for streaming source
# with Data.Schema `sch` to it, given we know if we'll be `appending` or not
function DataFrame(sink, sch::Data.Schema, ::Type{Field}, append::Bool, ref::Vector{UInt8})
rows, cols = size(sch)
newsize = max(0, rows) + (append ? size(sink, 1) : 0)
# need to make sure we don't break a NullableVector{WeakRefString{UInt8}} when appending
if append
for (i, T) in enumerate(Data.types(sch))
if T <: Nullable{WeakRefString{UInt8}}
sink.columns[i] = NullableArray(String[string(get(x, "")) for x in sink.columns[i]])
sch.types[i] = Nullable{String}
end
end
end
newsize != size(sink, 1) && foreach(x->resize!(x, newsize), sink.columns)
sch.rows = newsize
return sink
end
function DataFrame(sink, sch::Schema, ::Type{Column}, append::Bool, ref::Vector{UInt8})
rows, cols = size(sch)
append ? (sch.rows += size(sink, 1)) : foreach(empty!, sink.columns)
return sink
end

Data.streamtypes(::Type{DataFrame}) = [Data.Column, Data.Field]

Data.streamto!{T}(sink::DataFrame, ::Type{Data.Field}, val::T, row, col, sch::Data.Schema{false}) = push!(sink.columns[col]::Vector{T}, val)
Data.streamto!{T}(sink::DataFrame, ::Type{Data.Field}, val::Nullable{T}, row, col, sch::Data.Schema{false}) = push!(sink.columns[col]::NullableVector{T}, val)
Data.streamto!{T, R}(sink::DataFrame, ::Type{Data.Field}, val::CategoricalValue{T, R}, row, col, sch::Data.Schema{false}) = push!(sink.columns[col]::CategoricalVector{T, R}, val)
Data.streamto!{T, R}(sink::DataFrame, ::Type{Data.Field}, val::Nullable{CategoricalValue{T, R}}, row, col, sch::Data.Schema{false}) = push!(sink.columns[col]::NullableCategoricalVector{T, R}, val)

Data.streamto!{T}(sink::DataFrame, ::Type{Data.Field}, val::T, row, col, sch::Data.Schema{true}) = (sink.columns[col]::Vector{T})[row] = val
Data.streamto!{T}(sink::DataFrame, ::Type{Data.Field}, val::Nullable{T}, row, col, sch::Data.Schema{true}) = (sink.columns[col]::NullableVector{T})[row] = val
Data.streamto!(sink::DataFrame, ::Type{Data.Field}, val::Nullable{WeakRefString{UInt8}}, row, col, sch::Data.Schema{true}) = (sink.columns[col][row] = val)
Data.streamto!{T, R}(sink::DataFrame, ::Type{Data.Field}, val::CategoricalValue{T, R}, row, col, sch::Data.Schema{true}) = (sink.columns[col]::CategoricalVector{T, R})[row] = val
Data.streamto!{T, R}(sink::DataFrame, ::Type{Data.Field}, val::Nullable{CategoricalValue{T, R}}, row, col, sch::Data.Schema{true}) = (sink.columns[col]::NullableCategoricalVector{T, R})[row] = val

function Data.streamto!{T}(sink::DataFrame, ::Type{Data.Column}, column::T, row, col, sch::Data.Schema)
if row == 0
sink.columns[col] = column
else
append!(sink.columns[col]::T, column)
end
return length(column)
end

function Base.append!{T}(dest::NullableVector{WeakRefString{T}}, column::NullableVector{WeakRefString{T}})
offset = length(dest.values)
parentoffset = length(dest.parent)
append!(dest.isnull, column.isnull)
append!(dest.parent, column.parent)
# appending new data to `dest` would invalid all existing WeakRefString pointers
resize!(dest.values, length(dest) + length(column))
for i = 1:offset
old = dest.values[i]
dest.values[i] = WeakRefString{T}(pointer(dest.parent, old.ind), old.len, old.ind)
end
for i = 1:length(column)
old = column.values[i]
dest.values[offset + i] = WeakRefString{T}(pointer(dest.parent, parentoffset + old.ind), old.len, parentoffset + old.ind)
end
return length(dest)
end

end # module Data

end # module DataStreams
4 changes: 4 additions & 0 deletions test/REQUIRE
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
DataTables
NullableArrays 0.0.9
CategoricalArrays 0.0.5
WeakRefStrings 0.1.3
6 changes: 3 additions & 3 deletions test/runtests.jl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using Base.Test, DataStreams, DataFrames, NullableArrays
using Base.Test, DataStreams, DataTables, NullableArrays

if !isdefined(Core, :String)
typealias String UTF8String
Expand Down Expand Up @@ -33,12 +33,12 @@ TableSink(s::Data.Source) = TableSink(schema(s))

import DataStreams.Data.stream!

function Data.stream!(src::DataFrame, snk::TableSink)
function Data.stream!(src::DataTable, snk::TableSink)
# TODO: this could be improved considering different source Schema.
snk.data = src.columns
end

src_tb = DataFrame()
src_tb = DataTable()
snk = TableSink(sch)

snk_tb = Data.stream!(src_tb, snk)
Expand Down