diff --git a/.travis.yml b/.travis.yml index 2e481be..fecd6d5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,18 +4,22 @@ language: go # Versions of go that are explicitly supported by gonum plus go tip. go: - - 1.9.x - 1.10.x - 1.11.x - master matrix: + fast_finish: true allow_failures: - go: master before_install: # Required for format check. - go get golang.org/x/tools/cmd/goimports + # Required for imports check. + - go get gonum.org/v1/tools/cmd/check-imports + # Required for copyright header check. + - go get gonum.org/v1/tools/cmd/check-copyright # Required for coverage. - go get golang.org/x/tools/cmd/cover - go get github.com/mattn/goveralls @@ -25,14 +29,16 @@ go_import_path: gonum.org/v1/exp # Get deps, build, test, and ensure the code is gofmt'ed. # If we are building as gonum, then we have access to the coveralls api key, so we can run coverage as well. script: + - ${TRAVIS_BUILD_DIR}/.travis/check-copyright.sh - ${TRAVIS_BUILD_DIR}/.travis/check-formatting.sh - go get -d -t -v ./... - go build -v ./... - go test -v ./... - - go test -a -tags bounds -x -v ./... - - go test -a -tags noasm -x -v ./... - - go test -a -tags appengine -x -v ./... + - go test -a -tags bounds -v ./... + - go test -a -tags noasm -v ./... + - go test -a -tags appengine -v ./... - if [[ $TRAVIS_SECURE_ENV_VARS = "true" ]]; then bash ./.travis/test-coverage.sh; fi + - ${TRAVIS_BUILD_DIR}/.travis/check-imports.sh # This is run last since it alters the tree. - ${TRAVIS_BUILD_DIR}/.travis/check-generate.sh diff --git a/.travis/check-copyright.sh b/.travis/check-copyright.sh new file mode 100755 index 0000000..209dca3 --- /dev/null +++ b/.travis/check-copyright.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +set -e +check-copyright -notice "Copyright ©20[0-9]{2} The Gonum Authors\. All rights reserved\." diff --git a/.travis/check-imports.sh b/.travis/check-imports.sh new file mode 100755 index 0000000..f88fc5a --- /dev/null +++ b/.travis/check-imports.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +set -e +check-imports -b "math/rand,github.com/gonum/.*" diff --git a/.travis/test-coverage.sh b/.travis/test-coverage.sh index 1415518..ab31ab9 100644 --- a/.travis/test-coverage.sh +++ b/.travis/test-coverage.sh @@ -13,7 +13,7 @@ testCover() { # switch to the directory to check pushd $d > /dev/null # create the coverage profile - coverageresult=`go test -v -coverprofile=$PROFILE_OUT` + coverageresult=`go test -v $TAGS -coverprofile=$PROFILE_OUT` # output the result so we can check the shell output echo ${coverageresult} # append the results to acc.out if coverage didn't fail, else set the retval to 1 (failed) @@ -27,8 +27,8 @@ testCover() { # Init acc.out echo "mode: set" > $ACC_OUT -# Run test coverage on all directories containing go files except testlapack and testblas. -find . -type d -not -path '*testlapack*' -and -not -path '*testblas*' | while read d; do testCover $d || exit; done +# Run test coverage on all directories containing go files except testlapack testblas and testgraph. +find . -type d -not -path '*testlapack*' -and -not -path '*testblas*' -and -not -path '*testgraph*' | while read d; do testCover $d || exit; done # Upload the coverage profile to coveralls.io [ -n "$COVERALLS_TOKEN" ] && goveralls -coverprofile=$ACC_OUT -service=travis-ci -repotoken $COVERALLS_TOKEN diff --git a/README.md b/README.md index d5d7952..5967eeb 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Gonum exp [![Build Status](https://travis-ci.org/gonum/exp.svg?branch=master)](https://travis-ci.org/gonum/exp) +# Gonum exp [![Build Status](https://travis-ci.org/gonum/exp.svg?branch=master)](https://travis-ci.org/gonum/exp) [![Coverage Status](https://coveralls.io/repos/gonum/exp/badge.svg?branch=master&service=github)](https://coveralls.io/github/gonum/exp?branch=master) [![GoDoc](https://godoc.org/gonum.org/v1/exp?status.svg)](https://godoc.org/gonum.org/v1/exp) [![Go Report Card](https://goreportcard.com/badge/github.com/gonum/exp)](https://goreportcard.com/report/github.com/gonum/exp) ## Issues diff --git a/dframe/README.md b/dframe/README.md new file mode 100644 index 0000000..b52549c --- /dev/null +++ b/dframe/README.md @@ -0,0 +1,184 @@ +# dframe + +`dframe` is a work-in-progress [Data Frame](https://en.wikipedia.org/wiki/Pandas_%28software%29) a-la [pandas](https://pandas.pydata.org/pandas-docs/stable/index.html). + +`dframe` is leveraging [Apache Arrow](https://arrow.apache.org/) and its [Go backend](https://godoc.org/github.com/apache/arrow/go/arrow). + +## Proposal + +We propose to introduce a new `Frame` type inside the `dframe` package: a 2-dim data structure to handle: + +- tabular data with heterogeneous columns (like a `SQL` table) +- arbitrary matrix data with row and column labels +- any other form of observational/statistical dataset. + +For a good cross-pollination and integration with the Gonum and Go scientific ecosystem, it is expected for other "companion" packages tailored for a few focused operations to appear: + +- integration with `gonum/plot`, +- integration with `gonum/stat`, +- integration with `gonum/mat` (_e.g.:_ creation of `dframe.Frame`s from `gonum/mat.Vector` or `gonum/mat.Matrix`, and vice versa) +- `hdf5` loading/saving of `dframe.Frame`s, +- integration with `encoding/csv` or `npyio`, +- integration with `database/sql`, +- etc... + +### Previous work + +The data frame concept comes from `R`'s `data.frame` and Python's `pandas.DataFrame`: + +- https://www.rdocumentation.org/packages/base/versions/3.4.3/topics/data.frame +- https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html + +A few data frame-like implementations in Go have also been investigated: + +- [kniren/gota](https://github.com/kniren/gota) +- [tobgu/qframe](https://github.com/tobgu/qframe) + +Some inspiration from this previous body of work will be drawn, both in terms of API and performance hindsight. + +### dframe + +The main type should be: + +```go +package dframe + +type Frame struct { + // contains filtered or unexported fields +} + +// Err returns the first error encountered during operations on a Frame. +func (df *Frame) Err() error { ... } + +// NumRows returns the number of rows of this Frame. +func (df *Frame) NumRows() int { ... } + +// NumCols returns the number of columns of this Frame. +func (df *Frame) NumCols() int { ... } + +// Column returns the i-th column of this Frame. +func (df *Frame) Column(i int) *array.Column { ... } + +// ColumnNames returns the list of column names of this Frame. +func (df *Frame) ColumnNames() []string { ... } +``` + +It is expected to build `dframe.Frame` on top of the `arrow/array.Interface`. +Leveraging [Arrow](https://arrow.apache.org) for `dframe` enables interoperability with many analysis frameworks, possibly written in other languages than Go. +Arrow arrays are well specified: their memory layout is standardized and the IPC mechanism to send or receive them over the wire is also specified. +This increases the confidence the data we are writing or the analysis pipelines we build with Arrow could be migrated to something else (another language, another framework) if the need should arise. +The Go Arrow package is not feature complete yet with regard to the other language implementations (C++, Java.) +However, the Go implementation already ships with SIMD optimized operations and has the infrastructure for zero-copy support. + +`tobgu/qframe` presents a `QFrame` type that is essentially immutable. +Operations on a `QFrame`, such as copying columns, dropping columns, sorting them or applying some kind of operation on columns, return a new `QFrame`, leaving the original untouched. + +Arrow uses a ref-counting mechanism for all the types that involve memory allocation (mainly to address workloads involving memory allocated on a GPGPU, by a SQL database or a mmap-file.) +This ref-counting mechanism is presented to the user as a pair of methods `Retain`/`Release` that increment and decrement that reference count. +It would seem this mechanism prevents from exposing an API with "chained methods": + +```go +o := df.Slice(0, 10).Select("col1", "col2").Apply("col1 + col2") +``` +Each intermediate `Frame` -- the one returned by `Slice`, the one returned by `Select`, ... -- would be "leaked" as it is missing a call to `Release()` to correctly decrement its reference count. +If we want an immutable `Frame` -- without leaking memory, the code above should instead be rewritten as: + +```go +sli := df.Slice(0, 10) +defer sli.Release() + +sel := sli.Select("col1", "col2") +defer sel.Release() + +o := sel.Apply("col1 + col2") +defer o.Release() +``` +It is not clear (to me!) yet whether an immutable `Frame` makes much sense in Go and with this ref-counting mechanism coming from Arrow. + +However, introducing a `dframe.Tx` transaction could tackle the memory leak. +One can achieve the above goal if one only allows modifications of the underlying `Frame` through a transaction, where all operations are applied to a single temporary `Frame`: + +```go +// Exec runs the provided function inside an atomic read/write transaction, +// applied on this Frame. +func (df *Frame) Exec(f func(tx *Tx) error) error { ... } + +func example(df *dframe.Frame) { + err := df.Exec(func(tx *dframe.Tx) error { + tx.Slice(0, 10).Select("col1", "col2").Apply("col1 + col2") + return nil + }) + if err != nil { + log.Fatal(err) + } +} +``` + +Or, without a "chained methods" API: + +```go +func example(df *dframe.Frame) { + err := df.Exec(func(tx *dframe.Tx) error { + tx.Slice(0, 10) + tx.Select("col1", "col2") + tx.Apply("col1 + col2") + return nil + }) + if err != nil { + log.Fatal(err) + } +} +``` +Introducing a transaction has another nice feature: if the set of operations fails for some reason, one can rollback to the original state of the `Frame`. + +Finally, with a transaction context, one can build some kind of AST of operations that should be applied to a `Frame` and optionally optimize it behind the scene as one knows the complete set of operations to be carried. + +```go +// Open opens an already existing Frame using the provided driver technology, +// located at the provided source. +// +// Possible drivers: hdf5, npyio, csv, json, hdfs, spark, sql, ... +func Open(drv, src string) (*Frame, error) { ... } + +// Create creates a new Frame, using the provided driver technology +func Create(drv, dst string, schema *arrow.Schema, opts ...Option) (*Frame, error) { ... } + +// New creates a new in-memory data frame with the provided memory schema. +func New(schema *arrow.Schema, opts ...Option) (*Frame, error) { ... } + +// FromMem creates a new data frame from the provided in-memory data. +func FromMem(dict Dict, opts ...Option) (*Frame, error) { ... } + +// FromArrays creates a new data frame from the provided schema and arrays. +func FromArrays(schema *arrow.Schema, arrs []array.Interface, opts ...Option) (*Frame, error) { ... } + +// FromCols creates a new data frame from the provided schema and columns. +func FromCols(cols []array.Column, opts ...Option) (*Frame, error) { ... } + +// FromTable creates a new data frame from the provided arrow table. +func FromTable(tbl array.Table, opts ...Option) (*Frame, error) { ... } + +// FromFrame returns a new data frame created by applying the provided +// transaction on the provided frame. +func FromFrame(df *Frame, f func(tx *Tx) error) (*Frame, error) { ... } + +// Exec runs the provided function inside an atomic read/write transaction, +// applied on this Frame. +func (df *Frame) Exec(f func(tx *Tx) error) error { ... } + +// RExec runs the provided function inside an atomic read-only transaction, +// applied on this Frame. +func (df *Frame) RExec(f func(tx *Tx) error) error { ... } +``` + +### Operations + +One should be able to carry the following operations on a `dframe.Frame`: + +- retrieve the list of columns that a `Frame` is made of, +- create new columns that are the result of an operation on a set of already existing columns of that `Frame`, +- drop columns from a `Frame` +- append new data to a `Frame`, (either a new column or a new row) +- select a subset of columns from a `Frame` +- create different versions of a `Frame`: _e.g._ create `sub` from `Frame` `df` where `sub` is a subset of `df`. + diff --git a/dframe/dfmat/dfmat.go b/dframe/dfmat/dfmat.go new file mode 100644 index 0000000..b730643 --- /dev/null +++ b/dframe/dfmat/dfmat.go @@ -0,0 +1,144 @@ +// Copyright ©2019 The Gonum Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package dfmat provides a set of tools to easily leverage gonum/mat types +// from exp/dframe (and vice versa.) +// +// This is still a WIP package, building on the experience from: +// - https://github.com/kniren/gota +// - https://github.com/tobgu/qframe +// Ultimately, dframe should also allow for a good inter-operability with +// Apache Arrow: +// - https://godoc.org/github.com/apache/arrow/go/arrow +package dfmat // import "gonum.org/v1/exp/dframe/dfmat" + +import ( + "fmt" + + "github.com/apache/arrow/go/arrow" + "github.com/apache/arrow/go/arrow/array" + "github.com/apache/arrow/go/arrow/memory" + "gonum.org/v1/exp/dframe" + "gonum.org/v1/gonum/mat" +) + +type Option func(c *config) + +// WithNames configures a dframe.Frame with the provided set of column names. +func WithNames(names ...string) Option { + return func(c *config) { + c.names = make([]string, len(names)) + copy(c.names, names) + } +} + +type config struct { + names []string +} + +func newConfig(n int) *config { + cfg := config{names: make([]string, n)} + for i := range cfg.names { + cfg.names[i] = fmt.Sprintf("col-%03d", i+1) + } + return &cfg +} + +// FromMatrix creates a new dframe.Frame from a gonum/mat.Matrix. +func FromMatrix(m mat.Matrix, opts ...Option) *dframe.Frame { + var ( + mem = memory.NewGoAllocator() + + r, c = m.Dims() + arrs = make([]array.Interface, c) + fields = make([]arrow.Field, c) + ) + + cfg := newConfig(c) + for _, opt := range opts { + opt(cfg) + } + + bld := array.NewFloat64Builder(mem) + defer bld.Release() + + switch m := m.(type) { + case mat.RawColViewer: + for i := 0; i < c; i++ { + col := m.RawColView(i) + bld.AppendValues(col, nil) + arrs[i] = bld.NewArray() + fields[i] = arrow.Field{ + Name: cfg.names[i], + Type: arrs[i].DataType(), + } + } + default: + for i := 0; i < c; i++ { + for j := 0; j < r; j++ { + bld.Append(m.At(j, i)) + } + arrs[i] = bld.NewArray() + fields[i] = arrow.Field{ + Name: cfg.names[i], + Type: arrs[i].DataType(), + } + } + } + + schema := arrow.NewSchema(fields, nil) + df, err := dframe.FromArrays(schema, arrs) + if err != nil { + panic(err) + } + + return df +} + +// FromVector creates a new dframe.Frame from a gonum/mat.Vector. +func FromVector(vec mat.Vector, opts ...Option) *dframe.Frame { + var ( + mem = memory.NewGoAllocator() + + rows = vec.Len() + arrs = make([]array.Interface, 1) + fields = make([]arrow.Field, 1) + ) + + cfg := newConfig(1) + for _, opt := range opts { + opt(cfg) + } + + bld := array.NewFloat64Builder(mem) + defer bld.Release() + + switch vec := vec.(type) { + case mat.RawColViewer: + col := vec.RawColView(0) + bld.AppendValues(col, nil) + arrs[0] = bld.NewArray() + fields[0] = arrow.Field{ + Name: cfg.names[0], + Type: arrs[0].DataType(), + } + default: + for i := 0; i < rows; i++ { + bld.Append(vec.AtVec(i)) + } + arrs[0] = bld.NewArray() + fields[0] = arrow.Field{ + Name: cfg.names[0], + Type: arrs[0].DataType(), + } + } + + schema := arrow.NewSchema(fields, nil) + df, err := dframe.FromArrays(schema, arrs) + if err != nil { + panic(err) + } + + return df +} diff --git a/dframe/dfmat/dfmat_example_test.go b/dframe/dfmat/dfmat_example_test.go new file mode 100644 index 0000000..0701f18 --- /dev/null +++ b/dframe/dfmat/dfmat_example_test.go @@ -0,0 +1,107 @@ +// Copyright ©2019 The Gonum Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package dfmat_test + +import ( + "fmt" + + "github.com/apache/arrow/go/arrow/array" + + "gonum.org/v1/exp/dframe" + "gonum.org/v1/exp/dframe/dfmat" + "gonum.org/v1/gonum/mat" +) + +func columnNames(df *dframe.Frame) []string { + names := make([]string, df.NumCols()) + for i := range names { + names[i] = df.Name(i) + } + return names +} + +func Example_fromMatrix() { + m := mat.NewDense(3, 2, []float64{ + 1, 2, + 3, 4, + 5, 6, + }) + + { + df := dfmat.FromMatrix(m, dfmat.WithNames("x", "y")) + defer df.Release() + + fmt.Printf("cols: %v\n", columnNames(df)) + + tr := array.NewTableReader(df, -1) + defer tr.Release() + + n := 0 + for tr.Next() { + rec := tr.Record() + for i, col := range rec.Columns() { + fmt.Printf("rec[%d][%q]: %v\n", n, rec.ColumnName(i), col) + } + n++ + } + } + + { + df := dfmat.FromMatrix(m.T(), dfmat.WithNames("x", "y", "z")) + defer df.Release() + + fmt.Printf("cols: %v\n", columnNames(df)) + + tr := array.NewTableReader(df, -1) + defer tr.Release() + + n := 0 + for tr.Next() { + rec := tr.Record() + for i, col := range rec.Columns() { + fmt.Printf("rec[%d][%q]: %v\n", n, rec.ColumnName(i), col) + } + n++ + } + } + + // Output: + // cols: [x y] + // rec[0]["x"]: [1 3 5] + // rec[0]["y"]: [2 4 6] + // cols: [x y z] + // rec[0]["x"]: [1 2] + // rec[0]["y"]: [3 4] + // rec[0]["z"]: [5 6] +} + +func Example_fromVector() { + m := mat.NewVecDense(6, []float64{ + 1, 2, + 3, 4, + 5, 6, + }) + + df := dfmat.FromVector(m, dfmat.WithNames("x")) + defer df.Release() + + fmt.Printf("cols: %v\n", columnNames(df)) + + tr := array.NewTableReader(df, -1) + defer tr.Release() + + n := 0 + for tr.Next() { + rec := tr.Record() + for i, col := range rec.Columns() { + fmt.Printf("rec[%d][%q]: %v\n", n, rec.ColumnName(i), col) + } + n++ + } + + // Output: + // cols: [x] + // rec[0]["x"]: [1 2 3 4 5 6] +} diff --git a/dframe/dframe.go b/dframe/dframe.go index 73ed59f..a14173d 100644 --- a/dframe/dframe.go +++ b/dframe/dframe.go @@ -9,5 +9,619 @@ // - https://github.com/tobgu/qframe // Ultimately, dframe should also allow for a good inter-operability with // Apache Arrow: -// - https://github.com/apache/arrow -package dframe +// - https://godoc.org/github.com/apache/arrow/go/arrow +package dframe // import "gonum.org/v1/exp/dframe" + +import ( + "sort" + "sync" + "sync/atomic" + + "github.com/apache/arrow/go/arrow" + "github.com/apache/arrow/go/arrow/array" + "github.com/apache/arrow/go/arrow/memory" + "github.com/pkg/errors" +) + +// Frame is a Go-based data frame built on top of Apache Arrow. +type Frame struct { + mu sync.RWMutex // serialize creation of transactions + + refs int64 // reference count + err error // first error encountered + mem memory.Allocator + schema *arrow.Schema + + cols []array.Column + rows int64 +} + +var _ array.Table = (*Frame)(nil) + +// Dict is a map of string to array of data. +type Dict map[string]interface{} + +// FromMem creates a new data frame from the provided in-memory data. +func FromMem(dict Dict, opts ...Option) (*Frame, error) { + var ( + err error + mem = memory.NewGoAllocator() + arrs = make([]array.Interface, 0, len(dict)) + fields = make([]arrow.Field, 0, len(dict)) + ) + + keys := make([]string, 0, len(dict)) + for k := range dict { + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { + v := dict[k] + func(k string, v interface{}) { + var ( + arr array.Interface + ) + switch v := v.(type) { + case []bool: + bld := array.NewBooleanBuilder(mem) + defer bld.Release() + + bld.AppendValues(v, nil) + arr = bld.NewArray() + + case []int8: + bld := array.NewInt8Builder(mem) + defer bld.Release() + + bld.AppendValues(v, nil) + arr = bld.NewArray() + + case []int16: + bld := array.NewInt16Builder(mem) + defer bld.Release() + + bld.AppendValues(v, nil) + arr = bld.NewArray() + + case []int32: + bld := array.NewInt32Builder(mem) + defer bld.Release() + + bld.AppendValues(v, nil) + arr = bld.NewArray() + + case []int64: + bld := array.NewInt64Builder(mem) + defer bld.Release() + + bld.AppendValues(v, nil) + arr = bld.NewArray() + + case []uint8: + bld := array.NewUint8Builder(mem) + defer bld.Release() + + bld.AppendValues(v, nil) + arr = bld.NewArray() + + case []uint16: + bld := array.NewUint16Builder(mem) + defer bld.Release() + + bld.AppendValues(v, nil) + arr = bld.NewArray() + + case []uint32: + bld := array.NewUint32Builder(mem) + defer bld.Release() + + bld.AppendValues(v, nil) + arr = bld.NewArray() + + case []uint64: + bld := array.NewUint64Builder(mem) + defer bld.Release() + + bld.AppendValues(v, nil) + arr = bld.NewArray() + + case []float32: + bld := array.NewFloat32Builder(mem) + defer bld.Release() + + bld.AppendValues(v, nil) + arr = bld.NewArray() + + case []float64: + bld := array.NewFloat64Builder(mem) + defer bld.Release() + + bld.AppendValues(v, nil) + arr = bld.NewArray() + + case []string: + bld := array.NewStringBuilder(mem) + defer bld.Release() + + bld.AppendValues(v, nil) + arr = bld.NewArray() + + case []uint: + bld := array.NewUint64Builder(mem) + defer bld.Release() + + vs := make([]uint64, len(v)) + for i, e := range v { + vs[i] = uint64(e) + } + + bld.AppendValues(vs, nil) + arr = bld.NewArray() + + case []int: + bld := array.NewInt64Builder(mem) + defer bld.Release() + + vs := make([]int64, len(v)) + for i, e := range v { + vs[i] = int64(e) + } + + bld.AppendValues(vs, nil) + arr = bld.NewArray() + + default: + if err == nil { + err = errors.Errorf("dframe: invalid data type for %q (%T)", k, v) + return + } + } + + arrs = append(arrs, arr) + fields = append(fields, arrow.Field{Name: k, Type: arr.DataType()}) + + }(k, v) + } + + defer func() { + for i := range arrs { + arrs[i].Release() + } + }() + + if err != nil { + return nil, err + } + + schema := arrow.NewSchema(fields, nil) + return FromArrays(schema, arrs, opts...) +} + +// FromArrays creates a new data frame from the provided schema and arrays. +func FromArrays(schema *arrow.Schema, arrs []array.Interface, opts ...Option) (*Frame, error) { + df := &Frame{ + refs: 1, + mem: memory.NewGoAllocator(), + schema: schema, + rows: -1, + } + + for _, opt := range opts { + err := opt(df) + if err != nil { + return nil, err + } + } + + if df.rows < 0 { + switch len(arrs) { + case 0: + df.rows = 0 + default: + df.rows = int64(arrs[0].Len()) + } + } + + if df.schema == nil { + return nil, errors.Errorf("dframe: nil schema") + } + + if len(df.schema.Fields()) != len(arrs) { + return nil, errors.Errorf("dframe: inconsistent schema/arrays") + } + + for i, arr := range arrs { + ft := df.schema.Field(i) + if arr.DataType() != ft.Type { + return nil, errors.Errorf("dframe: column %q is inconsitent with schema", ft.Name) + } + + if int64(arr.Len()) < df.rows { + return nil, errors.Errorf("dframe: column %q expected length >= %d but got length %d", ft.Name, df.rows, arr.Len()) + } + } + + df.cols = make([]array.Column, len(arrs)) + for i := range arrs { + func(i int) { + chunk := array.NewChunked(arrs[i].DataType(), []array.Interface{arrs[i]}) + defer chunk.Release() + + col := array.NewColumn(df.schema.Field(i), chunk) + df.cols[i] = *col + }(i) + } + + return df, nil +} + +// FromCols creates a new data frame from the provided schema and columns. +func FromCols(cols []array.Column, opts ...Option) (*Frame, error) { + df := &Frame{ + refs: 1, + mem: memory.NewGoAllocator(), + cols: cols, + rows: -1, + } + + for _, opt := range opts { + err := opt(df) + if err != nil { + return nil, err + } + } + + if df.rows < 0 { + switch len(df.cols) { + case 0: + df.rows = 0 + default: + df.rows = int64(df.cols[0].Len()) + } + } + + fields := make([]arrow.Field, len(cols)) + for i, col := range cols { + fields[i].Name = col.Name() + fields[i].Type = col.DataType() + } + df.schema = arrow.NewSchema(fields, nil) + + // validate the data frame and its constituents. + // note we retain the columns after having validated the data frame + // in case the validation fails and panics (and would otherwise leak + // a ref-count on the columns.) + err := df.validate() + if err != nil { + return nil, err + } + + for i := range df.cols { + df.cols[i].Retain() + } + + return df, nil +} + +// FromTable creates a new data frame from the provided arrow table. +func FromTable(tbl array.Table, opts ...Option) (*Frame, error) { + df := &Frame{ + refs: 1, + mem: memory.NewGoAllocator(), + schema: tbl.Schema(), + rows: tbl.NumRows(), + } + + for _, opt := range opts { + err := opt(df) + if err != nil { + return nil, err + } + } + + df.cols = make([]array.Column, tbl.NumCols()) + for i := range df.cols { + col := tbl.Column(i) + end := int64(col.Len()) + df.cols[i] = *col.NewSlice(0, end) + } + + return df, nil +} + +// FromFrame returns a new data frame created by applying the provided +// transaction on the provided frame. +func FromFrame(df *Frame, f func(tx *Tx) error) (*Frame, error) { + out := df.clone() + err := out.Exec(f) + if err != nil { + out.Release() + return nil, err + } + + return out, nil +} + +func (df *Frame) validate() error { + if len(df.cols) != len(df.schema.Fields()) { + return errors.New("dframe: table schema mismatch") + } + for i, col := range df.cols { + if !col.Field().Equal(df.schema.Field(i)) { + return errors.Errorf("dframe: column field %q is inconsistent with schema", col.Name()) + } + + if int64(col.Len()) < df.rows { + return errors.Errorf("dframe: column %q expected length >= %d but got length %d", col.Name(), df.rows, col.Len()) + } + } + return nil +} + +// Option configures an aspect of a data frame. +type Option func(*Frame) error + +// WithMemAllocator configures a data frame to use the provided memory allocator. +func WithMemAllocator(mem memory.Allocator) Option { + return func(df *Frame) error { + df.mem = mem + return nil + } +} + +// Err returns the first error encountered during operations on a Frame. +func (df *Frame) Err() error { + return df.err +} + +// Retain increases the reference count by 1. +// Retain may be called simultaneously from multiple goroutines. +func (df *Frame) Retain() { + atomic.AddInt64(&df.refs, 1) +} + +// Release decreases the reference count by 1. +// When the reference count goes to zero, the memory is freed. +// Release may be called simultaneously from multiple goroutines. +func (df *Frame) Release() { + if atomic.LoadInt64(&df.refs) <= 0 { + panic("dframe: too many releases") + } + + if atomic.AddInt64(&df.refs, -1) == 0 { + for i := range df.cols { + df.cols[i].Release() + } + df.cols = nil + } +} + +// Schema returns the schema of this Frame. +func (df *Frame) Schema() *arrow.Schema { + return df.schema +} + +// NumRows returns the number of rows of this Frame. +func (df *Frame) NumRows() int64 { + return df.rows +} + +// NumCols returns the number of columns of this Frame. +func (df *Frame) NumCols() int64 { + return int64(len(df.cols)) +} + +// Column returns the i-th column of this Frame. +func (df *Frame) Column(i int) *array.Column { + return &df.cols[i] +} + +// Name returns the name of the i-th column of this Frame. +func (df *Frame) Name(i int) string { + return df.Column(i).Name() +} + +// Exec runs the provided function inside an atomic read-write transaction, +// applied on this Frame. +func (df *Frame) Exec(f func(tx *Tx) error) error { + df.mu.Lock() + defer df.mu.Unlock() + + if df.err != nil { + return df.err + } + + tx := newTx(df) + defer tx.Close() + + err := f(tx) + if err != nil { + return err + } + if tx.Err() != nil { + return tx.Err() + } + + df.swap(tx.df) + return nil +} + +// RExec runs the provided function inside an atomic read-only transaction, +// applied on this Frame. +func (df *Frame) RExec(f func(tx *Tx) error) error { + df.mu.RLock() + defer df.mu.RUnlock() + + if df.err != nil { + return df.err + } + + tx := newRTx(df) + defer tx.Close() + + err := f(tx) + if err != nil { + return err + } + + return tx.Err() +} + +func (lhs *Frame) swap(rhs *Frame) { + rhs.refs = atomic.SwapInt64(&lhs.refs, rhs.refs) + lhs.mem, rhs.mem = rhs.mem, lhs.mem + lhs.schema, rhs.schema = rhs.schema, lhs.schema + lhs.rows, rhs.rows = rhs.rows, lhs.rows + lhs.cols, rhs.cols = rhs.cols, lhs.cols +} + +func (df *Frame) clone() *Frame { + o := &Frame{ + refs: 1, + mem: df.mem, + schema: df.schema, + cols: make([]array.Column, len(df.cols)), + rows: df.rows, + } + copy(o.cols, df.cols) + for i := range o.cols { + o.cols[i].Retain() + } + return o +} + +// Tx represents a read-only or read/write transaction on a data frame. +type Tx struct { + df *Frame + rw bool // read-write access + err error +} + +func newTx(df *Frame) *Tx { + tx := &Tx{df: df.clone(), rw: true} + return tx +} + +func newRTx(df *Frame) *Tx { + tx := &Tx{df: df.clone(), rw: false} + return tx +} + +func (tx *Tx) Close() error { + if tx.err != nil { + return tx.err + } + + tx.df.Release() + return nil +} + +func (tx *Tx) Err() error { + return tx.err +} + +// Copy copies the content of the column named src to the column named dst. +// +// Copy fails if src does not exist. +// Copy fails if dst already exist. +func (tx *Tx) Copy(dst, src string) *Tx { + if tx.err != nil { + return tx + } + + if !tx.rw { + tx.err = errors.Errorf("dframe: r/w operation on read-only transaction") + return tx + } + + if tx.df.Schema().HasField(dst) { + tx.err = errors.Errorf("dframe: column %q already exists", dst) + return tx + } + if !tx.df.Schema().HasField(src) { + tx.err = errors.Errorf("dframe: no column named %q", src) + return tx + } + + isrc := tx.df.Schema().FieldIndex(src) + idst := len(tx.df.Schema().Fields()) + + fields := make([]arrow.Field, len(tx.df.Schema().Fields())+1) + copy(fields, tx.df.Schema().Fields()) + + fields[idst] = fields[isrc] + fields[idst].Name = dst + + md := tx.df.Schema().Metadata() + tx.df.schema = arrow.NewSchema(fields, &md) + + col := array.NewColumn(fields[idst], tx.df.cols[isrc].Data()) + tx.df.cols = append(tx.df.cols, *col) + return tx +} + +// Slice creates a new frame consisting of rows[beg:end]. +func (tx *Tx) Slice(beg, end int) *Tx { + if tx.err != nil { + return tx + } + + if !tx.rw { + tx.err = errors.Errorf("dframe: r/w operation on read-only transaction") + return tx + } + + if int64(end) > tx.df.rows || beg > end { + tx.err = errors.Errorf("dframe: index out of range") + return tx + } + + cols := make([]array.Column, tx.df.NumCols()) + for i := range cols { + cols[i] = *tx.df.Column(i).NewSlice(int64(beg), int64(end)) + } + + for _, col := range tx.df.cols { + col.Release() + } + + tx.df.cols = cols + tx.df.rows = int64(end - beg) + return tx +} + +func (tx *Tx) Drop(cols ...string) *Tx { + if tx.err != nil || len(cols) == 0 { + return tx + } + + if !tx.rw { + tx.err = errors.Errorf("dframe: r/w operation on read-only transaction") + return tx + } + + set := make(map[string]struct{}, len(cols)) + for _, col := range cols { + set[col] = struct{}{} + } + + cs := make([]array.Column, 0, len(tx.df.cols)-len(cols)) + fs := make([]arrow.Field, 0, len(tx.df.Schema().Fields())-len(cols)) + + for i := range tx.df.cols { + col := &tx.df.cols[i] + if _, ok := set[col.Name()]; ok { + col.Release() + continue + } + cs = append(cs, *col) + fs = append(fs, tx.df.Schema().Field(i)) + } + + md := tx.df.Schema().Metadata() // FIXME(sbinet): also remove metadata of removed cols. + sc := arrow.NewSchema(fs, &md) + + tx.df.cols = cs + tx.df.schema = sc + return tx +} diff --git a/dframe/dframe_example_test.go b/dframe/dframe_example_test.go new file mode 100644 index 0000000..280ef7f --- /dev/null +++ b/dframe/dframe_example_test.go @@ -0,0 +1,347 @@ +// Copyright ©2019 The Gonum Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package dframe_test + +import ( + "fmt" + "log" + + "github.com/apache/arrow/go/arrow" + "github.com/apache/arrow/go/arrow/array" + "github.com/apache/arrow/go/arrow/memory" + "gonum.org/v1/exp/dframe" +) + +func columnNames(df *dframe.Frame) []string { + names := make([]string, df.NumCols()) + for i := range names { + names[i] = df.Name(i) + } + return names +} + +func ExampleFrame_fromTable() { + pool := memory.NewGoAllocator() + + schema := arrow.NewSchema( + []arrow.Field{ + {Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32}, + {Name: "f2-f64", Type: arrow.PrimitiveTypes.Float64}, + }, + nil, + ) + + b := array.NewRecordBuilder(pool, schema) + defer b.Release() + + b.Field(0).(*array.Int32Builder).AppendValues([]int32{1, 2, 3, 4, 5, 6}, nil) + b.Field(0).(*array.Int32Builder).AppendValues([]int32{7, 8, 9, 10}, []bool{true, true, false, true}) + b.Field(1).(*array.Float64Builder).AppendValues([]float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) + + rec1 := b.NewRecord() + defer rec1.Release() + + b.Field(0).(*array.Int32Builder).AppendValues([]int32{11, 12, 13, 14, 15, 16, 17, 18, 19, 20}, nil) + b.Field(1).(*array.Float64Builder).AppendValues([]float64{11, 12, 13, 14, 15, 16, 17, 18, 19, 20}, nil) + + rec2 := b.NewRecord() + defer rec2.Release() + + tbl := array.NewTableFromRecords(schema, []array.Record{rec1, rec2}) + defer tbl.Release() + + df, err := dframe.FromTable(tbl) + if err != nil { + log.Fatal(err) + } + defer df.Release() + + fmt.Printf("cols: %v\n", columnNames(df)) + + err = df.Exec(func(tx *dframe.Tx) error { + tx.Drop("f1-i32") + tx.Copy("fx-f64", "f2-f64") + return nil + }) + if err != nil { + log.Fatal(err) + } + + fmt.Printf("cols: %v\n", columnNames(df)) + + tr := array.NewTableReader(df, 5) + defer tr.Release() + + n := 0 + for tr.Next() { + rec := tr.Record() + for i, col := range rec.Columns() { + fmt.Printf("rec[%d][%q]: %v\n", n, rec.ColumnName(i), col) + } + n++ + } + + // Output: + // cols: [f1-i32 f2-f64] + // cols: [f2-f64 fx-f64] + // rec[0]["f2-f64"]: [1 2 3 4 5] + // rec[0]["fx-f64"]: [1 2 3 4 5] + // rec[1]["f2-f64"]: [6 7 8 9 10] + // rec[1]["fx-f64"]: [6 7 8 9 10] + // rec[2]["f2-f64"]: [11 12 13 14 15] + // rec[2]["fx-f64"]: [11 12 13 14 15] + // rec[3]["f2-f64"]: [16 17 18 19 20] + // rec[3]["fx-f64"]: [16 17 18 19 20] +} + +func ExampleFrame_fromArrays() { + pool := memory.NewGoAllocator() + + schema := arrow.NewSchema( + []arrow.Field{ + {Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32}, + {Name: "f2-f64", Type: arrow.PrimitiveTypes.Float64}, + }, + nil, + ) + + b := array.NewRecordBuilder(pool, schema) + defer b.Release() + + b.Field(0).(*array.Int32Builder).AppendValues([]int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) + b.Field(1).(*array.Float64Builder).AppendValues([]float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) + + rec := b.NewRecord() + defer rec.Release() + + df, err := dframe.FromArrays(schema, rec.Columns()) + if err != nil { + log.Fatal(err) + } + defer df.Release() + + fmt.Printf("cols: %v\n", columnNames(df)) + + err = df.Exec(func(tx *dframe.Tx) error { + tx.Drop("f1-i32") + tx.Copy("fx-f64", "f2-f64") + tx.Slice(3, 8) + return nil + }) + if err != nil { + log.Fatal(err) + } + fmt.Printf("cols: %v\n", columnNames(df)) + + tr := array.NewTableReader(df, -1) + defer tr.Release() + + n := 0 + for tr.Next() { + rec := tr.Record() + for i, col := range rec.Columns() { + fmt.Printf("rec[%d][%q]: %v\n", n, rec.ColumnName(i), col) + } + n++ + } + + // Output: + // cols: [f1-i32 f2-f64] + // cols: [f2-f64 fx-f64] + // rec[0]["f2-f64"]: [4 5 6 7 8] + // rec[0]["fx-f64"]: [4 5 6 7 8] +} + +func ExampleFrame_fromCols() { + pool := memory.NewGoAllocator() + + schema := arrow.NewSchema( + []arrow.Field{ + {Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32}, + {Name: "f2-f64", Type: arrow.PrimitiveTypes.Float64}, + }, + nil, + ) + + b := array.NewRecordBuilder(pool, schema) + defer b.Release() + + b.Field(0).(*array.Int32Builder).AppendValues([]int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) + b.Field(1).(*array.Float64Builder).AppendValues([]float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) + + rec := b.NewRecord() + defer rec.Release() + + cols := func() []array.Column { + var cols []array.Column + for i, field := range schema.Fields() { + chunk := array.NewChunked(field.Type, []array.Interface{rec.Column(i)}) + defer chunk.Release() + col := array.NewColumn(field, chunk) + cols = append(cols, *col) + } + return cols + }() + + df, err := dframe.FromCols(cols) + if err != nil { + log.Fatal(err) + } + defer df.Release() + + fmt.Printf("cols: %v\n", columnNames(df)) + + err = df.Exec(func(tx *dframe.Tx) error { + tx.Drop("f1-i32") + tx.Copy("fx-f64", "f2-f64") + tx.Slice(3, 8) + return nil + }) + if err != nil { + log.Fatal(err) + } + + fmt.Printf("cols: %v\n", columnNames(df)) + + tr := array.NewTableReader(df, 5) + defer tr.Release() + + n := 0 + for tr.Next() { + rec := tr.Record() + for i, col := range rec.Columns() { + fmt.Printf("rec[%d][%q]: %v\n", n, rec.ColumnName(i), col) + } + n++ + } + + // Output: + // cols: [f1-i32 f2-f64] + // cols: [f2-f64 fx-f64] + // rec[0]["f2-f64"]: [4 5 6 7 8] + // rec[0]["fx-f64"]: [4 5 6 7 8] +} + +func ExampleFrame_fromFrame() { + pool := memory.NewGoAllocator() + + schema := arrow.NewSchema( + []arrow.Field{ + {Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32}, + {Name: "f2-f64", Type: arrow.PrimitiveTypes.Float64}, + }, + nil, + ) + + b := array.NewRecordBuilder(pool, schema) + defer b.Release() + + b.Field(0).(*array.Int32Builder).AppendValues([]int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) + b.Field(1).(*array.Float64Builder).AppendValues([]float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) + + rec := b.NewRecord() + defer rec.Release() + + df, err := dframe.FromArrays(schema, rec.Columns()) + if err != nil { + log.Fatal(err) + } + defer df.Release() + + fmt.Printf("cols: %v\n", columnNames(df)) + + sub, err := dframe.FromFrame(df, func(tx *dframe.Tx) error { + tx.Drop("f1-i32") + tx.Copy("fx-f64", "f2-f64") + tx.Slice(3, 8) + return nil + }) + if err != nil { + log.Fatal(err) + } + defer sub.Release() + + fmt.Printf("sub: %v\n", columnNames(sub)) + fmt.Printf("cols: %v\n", columnNames(df)) + + for i, df := range []*dframe.Frame{df, sub} { + fmt.Printf("--- frame %d ---\n", i) + tr := array.NewTableReader(df, -1) + defer tr.Release() + + n := 0 + for tr.Next() { + rec := tr.Record() + for i, col := range rec.Columns() { + fmt.Printf("rec[%d][%q]: %v\n", n, rec.ColumnName(i), col) + } + n++ + } + } + + // Output: + // cols: [f1-i32 f2-f64] + // sub: [f2-f64 fx-f64] + // cols: [f1-i32 f2-f64] + // --- frame 0 --- + // rec[0]["f1-i32"]: [1 2 3 4 5 6 7 8 9 10] + // rec[0]["f2-f64"]: [1 2 3 4 5 6 7 8 9 10] + // --- frame 1 --- + // rec[0]["f2-f64"]: [4 5 6 7 8] + // rec[0]["fx-f64"]: [4 5 6 7 8] +} + +func ExampleFrame_fromMem() { + df, err := dframe.FromMem(dframe.Dict{ + "f1-i32": []int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + "f2-f64": []float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + }) + if err != nil { + log.Fatal(err) + } + defer df.Release() + + fmt.Printf("cols: %v\n", columnNames(df)) + + sub, err := dframe.FromFrame(df, func(tx *dframe.Tx) error { + tx.Drop("f1-i32") + tx.Copy("fx-f64", "f2-f64") + tx.Slice(3, 8) + return nil + }) + if err != nil { + log.Fatal(err) + } + defer sub.Release() + + fmt.Printf("sub: %v\n", columnNames(sub)) + fmt.Printf("cols: %v\n", columnNames(df)) + + for i, df := range []*dframe.Frame{df, sub} { + fmt.Printf("--- frame %d ---\n", i) + tr := array.NewTableReader(df, -1) + defer tr.Release() + + n := 0 + for tr.Next() { + rec := tr.Record() + for i, col := range rec.Columns() { + fmt.Printf("rec[%d][%q]: %v\n", n, rec.ColumnName(i), col) + } + n++ + } + } + + // Output: + // cols: [f1-i32 f2-f64] + // sub: [f2-f64 fx-f64] + // cols: [f1-i32 f2-f64] + // --- frame 0 --- + // rec[0]["f1-i32"]: [1 2 3 4 5 6 7 8 9 10] + // rec[0]["f2-f64"]: [1 2 3 4 5 6 7 8 9 10] + // --- frame 1 --- + // rec[0]["f2-f64"]: [4 5 6 7 8] + // rec[0]["fx-f64"]: [4 5 6 7 8] +} diff --git a/linsolve/cg_example_test.go b/linsolve/cg_example_test.go index ab67902..869afc0 100644 --- a/linsolve/cg_example_test.go +++ b/linsolve/cg_example_test.go @@ -1,4 +1,4 @@ -// Copyright ©2016 The gonum Authors. All rights reserved. +// Copyright ©2016 The Gonum Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. diff --git a/linsolve/gmres.go b/linsolve/gmres.go index 2fec7e3..ad781ba 100644 --- a/linsolve/gmres.go +++ b/linsolve/gmres.go @@ -1,4 +1,4 @@ -// Copyright ©2017 The gonum Authors. All rights reserved. +// Copyright ©2017 The Gonum Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. diff --git a/linsolve/internal/mmarket/reader.go b/linsolve/internal/mmarket/reader.go index e67823e..eaf92d4 100644 --- a/linsolve/internal/mmarket/reader.go +++ b/linsolve/internal/mmarket/reader.go @@ -1,4 +1,4 @@ -// Copyright ©2017 The gonum Authors. All rights reserved. +// Copyright ©2017 The Gonum Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. diff --git a/linsolve/internal/triplet/triplet.go b/linsolve/internal/triplet/triplet.go index 9405a5e..bed1fd3 100644 --- a/linsolve/internal/triplet/triplet.go +++ b/linsolve/internal/triplet/triplet.go @@ -1,4 +1,4 @@ -// Copyright ©2017 The gonum Authors. All rights reserved. +// Copyright ©2017 The Gonum Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. diff --git a/linsolve/iterative_test.go b/linsolve/iterative_test.go index f736763..6cb96ef 100644 --- a/linsolve/iterative_test.go +++ b/linsolve/iterative_test.go @@ -1,4 +1,4 @@ -// Copyright ©2017 The Gonum authors. All rights reserved. +// Copyright ©2017 The Gonum Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. diff --git a/linsolve/linsolve_test.go b/linsolve/linsolve_test.go index 5680e93..178554d 100644 --- a/linsolve/linsolve_test.go +++ b/linsolve/linsolve_test.go @@ -1,4 +1,4 @@ -// Copyright ©2017 The Gonum authors. All rights reserved. +// Copyright ©2017 The Gonum Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file.