diff --git a/api/mime.go b/api/mime.go index e24bf3fc27..525e8ad3d0 100644 --- a/api/mime.go +++ b/api/mime.go @@ -8,16 +8,17 @@ import ( ) const ( - MediaTypeAny = "*/*" - MediaTypeCSV = "text/csv" - MediaTypeJSON = "application/json" - MediaTypeLine = "application/x-line" - MediaTypeNDJSON = "application/x-ndjson" - MediaTypeParquet = "application/x-parquet" - MediaTypeZeek = "application/x-zeek" - MediaTypeZJSON = "application/x-zjson" - MediaTypeZNG = "application/x-zng" - MediaTypeZSON = "application/x-zson" + MediaTypeAny = "*/*" + MediaTypeArrowStream = "application/vnd.apache.arrow.stream" + MediaTypeCSV = "text/csv" + MediaTypeJSON = "application/json" + MediaTypeLine = "application/x-line" + MediaTypeNDJSON = "application/x-ndjson" + MediaTypeParquet = "application/x-parquet" + MediaTypeZeek = "application/x-zeek" + MediaTypeZJSON = "application/x-zjson" + MediaTypeZNG = "application/x-zng" + MediaTypeZSON = "application/x-zson" ) type ErrUnsupportedMimeType struct { @@ -41,6 +42,8 @@ func MediaTypeToFormat(s string, dflt string) (string, error) { switch typ { case MediaTypeAny, "": return dflt, nil + case MediaTypeArrowStream: + return "arrows", nil case MediaTypeCSV: return "csv", nil case MediaTypeJSON: @@ -65,6 +68,8 @@ func MediaTypeToFormat(s string, dflt string) (string, error) { func FormatToMediaType(format string) string { switch format { + case "arrows": + return MediaTypeArrowStream case "csv": return MediaTypeCSV case "json": diff --git a/cli/inputflags/flags.go b/cli/inputflags/flags.go index 7553e1b52d..209d12b777 100644 --- a/cli/inputflags/flags.go +++ b/cli/inputflags/flags.go @@ -27,7 +27,7 @@ func (f *Flags) Options() anyio.ReaderOpts { } func (f *Flags) SetFlags(fs *flag.FlagSet, validate bool) { - fs.StringVar(&f.Format, "i", "auto", "format of input data [auto,zng,vng,json,zeek,zjson,csv,parquet,line]") + fs.StringVar(&f.Format, "i", "auto", "format of input data [arrows,auto,zng,vng,json,zeek,zjson,csv,parquet,line]") fs.BoolVar(&f.ZNG.Validate, "validate", validate, "validate the input format when reading ZNG streams") fs.IntVar(&f.ZNG.Threads, "threads", 0, "number of threads used for scanning ZNG input") f.ReadMax = auto.NewBytes(zngio.MaxSize) diff --git a/cli/outputflags/flags.go b/cli/outputflags/flags.go index 15797195e4..0598b50bb5 100644 --- a/cli/outputflags/flags.go +++ b/cli/outputflags/flags.go @@ -75,7 +75,7 @@ func (f *Flags) SetFormatFlags(fs *flag.FlagSet) { if f.DefaultFormat == "" { f.DefaultFormat = "zng" } - fs.StringVar(&f.Format, "f", f.DefaultFormat, "format for output data [zng,vng,json,parquet,table,text,csv,lake,zeek,zjson,zson]") + fs.StringVar(&f.Format, "f", f.DefaultFormat, "format for output data [arrows,zng,vng,json,parquet,table,text,csv,lake,zeek,zjson,zson]") fs.BoolVar(&f.jsonShortcut, "j", false, "use line-oriented JSON output independent of -f option") fs.BoolVar(&f.zsonShortcut, "z", false, "use line-oriented ZSON output independent of -f option") fs.BoolVar(&f.zsonPretty, "Z", false, "use formatted ZSON output independent of -f option") diff --git a/docs/commands/zq.md b/docs/commands/zq.md index 55cfe135dd..b7e056a904 100644 --- a/docs/commands/zq.md +++ b/docs/commands/zq.md @@ -97,6 +97,7 @@ Note here that the query `1+1` [implies](../language/overview.md#26-implied-oper | Option | Auto | Specification | |-----------|------|------------------------------------------| +| `arrows` | yes | [Arrow IPC Stream Format](https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format) | | `json` | yes | [JSON RFC 8259](https://www.rfc-editor.org/rfc/rfc8259.html) | | `csv` | yes | [CSV RFC 4180](https://www.rfc-editor.org/rfc/rfc4180.html) | | `parquet` | no | [Apache Parquet](https://github.com/apache/parquet-format) | @@ -285,14 +286,14 @@ produces ### 3.4 Schema-rigid Outputs -Certain data formats like Parquet are "schema rigid" in the sense that they -require a schema to be defined before values can be written into the file and -all the values in the file must conform to this schema. +Certain data formats like Arrow and Parquet are "schema rigid" in the sense that +they require a schema to be defined before values can be written into the file +and all the values in the file must conform to this schema. Zed, however, has a fine-grained type system instead of schemas and a sequence of data values are completely self-describing and may be heterogeneous in nature. This creates a challenge converting the type-flexible Zed formats to a schema-rigid -format like Parquet. +format like Arrow and Parquet. For example, this seemingly simple conversion: ```mdtest-command fails @@ -319,8 +320,8 @@ but the data was necessarily changed (by inserting nulls): #### 3.4.2 Splitting Schemas -Another common approach to dealing with the schema-rigid limitation of Parquet -is to create a separate file for each schema. +Another common approach to dealing with the schema-rigid limitation of Arrow and +Parquet is to create a separate file for each schema. `zq` can do this too with the `-split` option, which specifies a path to a directory for the output files. If the path is `.`, then files diff --git a/docs/lake/api.md b/docs/lake/api.md index 7c330e254a..5df2bb2927 100644 --- a/docs/lake/api.md +++ b/docs/lake/api.md @@ -516,6 +516,7 @@ The supported MIME types are as follows: | Format | MIME Type | | ------ | --------- | +| Arrow IPC Stream | application/vnd.apache.arrow.stream | | CSV | text/csv | | JSON | application/json | | NDJSON | application/x-ndjson | diff --git a/docs/tutorials/schools.md b/docs/tutorials/schools.md index e59fbc52af..04bb675765 100644 --- a/docs/tutorials/schools.md +++ b/docs/tutorials/schools.md @@ -721,8 +721,8 @@ Geyserville New Tech Academy,Geyserville Unified,Geyserville,Sonoma,95441-9670,3 ,,,,,,,,,,,,,,,,Sonoma,Geyserville Unified,Geyserville New Tech Academy ,,,,,,,,,,,,,,,,Sonoma,Geyserville Unified, ``` -In addition to the `csv` format, the `parquet`, `table`, and `zeek` formats -also benefit from fused records. +In addition to the `csv` format, the `arrows`, `parquet`, `table`, and `zeek` +formats also benefit from fused records. ### 4.4 [put](../language/operators/drop.md) diff --git a/go.mod b/go.mod index 100002e8be..a6627a31aa 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.19 require ( github.com/agnivade/levenshtein v1.1.1 github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 + github.com/apache/arrow/go/v11 v11.0.0-20221214174703-0dfec8e98f4f github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de github.com/aws/aws-sdk-go v1.36.17 github.com/axiomhq/hyperloglog v0.0.0-20191112132149-a4c4c47bc57f @@ -31,8 +32,8 @@ require ( go.uber.org/multierr v1.8.0 go.uber.org/zap v1.23.0 golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b - golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 - golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f + golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde + golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 golang.org/x/text v0.3.7 gopkg.in/natefinch/lumberjack.v2 v2.0.0 @@ -40,27 +41,39 @@ require ( ) require ( + github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect + github.com/andybalholm/brotli v1.0.4 // indirect github.com/apache/thrift v0.16.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect - github.com/golang/protobuf v1.4.3 // indirect + github.com/goccy/go-json v0.9.11 // indirect + github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.4 // indirect + github.com/google/flatbuffers v2.0.8+incompatible // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect - github.com/mattn/go-isatty v0.0.12 // indirect + github.com/klauspost/asmfmt v1.3.2 // indirect + github.com/klauspost/compress v1.15.9 // indirect + github.com/klauspost/cpuid/v2 v2.0.9 // indirect + github.com/mattn/go-isatty v0.0.16 // indirect github.com/mattn/go-runewidth v0.0.10 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect - github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect + github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect + github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect github.com/prometheus/common v0.10.0 // indirect github.com/prometheus/procfs v0.1.3 // indirect github.com/rivo/uniseg v0.1.0 // indirect + github.com/zeebo/xxh3 v1.0.2 // indirect go.opentelemetry.io/otel v0.16.0 // indirect - go.uber.org/atomic v1.10.0 // indirect + go.uber.org/atomic v1.7.0 // indirect + golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect - golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect - google.golang.org/protobuf v1.25.1-0.20201020201750-d3470999428b // indirect - gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect + golang.org/x/tools v0.1.12 // indirect + golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect + google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect + google.golang.org/grpc v1.49.0 // indirect + google.golang.org/protobuf v1.28.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index c9b3e6fff8..48c2ef63b1 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,8 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU= +github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/agnivade/levenshtein v1.1.1 h1:QY8M92nrzkmr798gCo3kmMyqXFzdQVpxLlGPRBij0P8= github.com/agnivade/levenshtein v1.1.1/go.mod h1:veldBMzWxcCG2ZvUTKD2kJNRdCk5hVbJomOvKkmgYbo= @@ -8,6 +11,10 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 h1:Hs82Z41s6SdL1CELW+XaDYmOH4hkBN4/N9og/AsOv7E= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY= +github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= +github.com/apache/arrow/go/v11 v11.0.0-20221214174703-0dfec8e98f4f h1:QUx+UUDqXqbmYrTUhaLVv6UZFQ13DT3uyK8JvweSvO4= +github.com/apache/arrow/go/v11 v11.0.0-20221214174703-0dfec8e98f4f/go.mod h1:Eg5OsL5H+e299f7u5ssuXsuHQVEGC4xei5aX110hRiI= github.com/apache/thrift v0.16.0 h1:qEy6UW60iVOlUy+b9ZR0d5WzUWYGOo4HfopoyBaNmoY= github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de h1:FxWPpzIjnTlhPwqqXc4/vE0f7GvRjuAsbW+HOIe8KnA= @@ -24,9 +31,11 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= @@ -41,6 +50,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dgryski/trifles v0.0.0-20200323201526-dd97f9abfb48 h1:fRzb/w+pyskVMQ+UbP35JkH8yB7MYb4q/qhBarqZE6g= github.com/dgryski/trifles v0.0.0-20200323201526-dd97f9abfb48/go.mod h1:if7Fbed8SFyPtHLHbg49SI7NAdJiC5WIA09pe59rfAA= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fraugster/parquet-go v0.10.1-0.20220222153523-e6b70a8a7212 h1:u7X3aZRlWSm18x0EysX9szRULhH7QYQv7UkxW1yHbik= github.com/fraugster/parquet-go v0.10.1-0.20220222153523-e6b70a8a7212/go.mod h1:dGzUxdNqXsAijatByVgbAWVPlFirnhknQbdazcUIjY0= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= @@ -57,9 +68,13 @@ github.com/go-playground/universal-translator v0.16.0/go.mod h1:1AnU7NaIRDWWzGEK github.com/go-redis/redis/v8 v8.4.11 h1:t2lToev01VTrqYQcv+QFbxtGgcf64K+VUMgf9Ap6A/E= github.com/go-redis/redis/v8 v8.4.11/go.mod h1:d5yY/TlkQyYBSBHnXUmnf1OrHbyQere5JV4dLKwvXmo= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/goccy/go-json v0.9.11 h1:/pAaQDLHEoCq/5FFmSKBswWmK6H0e8g4159Kc/X/nqk= +github.com/goccy/go-json v0.9.11/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/golang-jwt/jwt v3.2.1+incompatible h1:73Z+4BJcrTC+KczS6WvTPvRGOp1WmfEP4Q1lOd9Z/+c= github.com/golang-jwt/jwt v3.2.1+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.5.0 h1:jlYHihg//f7RRwuPfptm04yp4s7O6Kw8EZiVYIGcH0g= github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -72,15 +87,19 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= -github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM= -github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/flatbuffers v2.0.8+incompatible h1:ivUb1cGomAB101ZM1T0nOiWz9pSrTMoa9+EiY7igmkM= +github.com/google/flatbuffers v2.0.8+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/gorilla/mux v1.7.5-0.20200711200521-98cb6bf42e08 h1:kPna6oIGlRXWmg/jkKfxbpvsl+0DHYnw1qQwN+6+gyA= @@ -101,9 +120,16 @@ github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCV github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= +github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= +github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= +github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= +github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= +github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -111,13 +137,17 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/leodido/go-urn v1.1.0/go.mod h1:+cyI34gQWZcE1eQU7NVgKkkzdXDQHr1dBMtdAPozLkw= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ= -github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= -github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/mattn/go-runewidth v0.0.10 h1:CoZ3S2P7pvtP45xOtBw+/mDL2z0RKI576gSkzRRpdGg= github.com/mattn/go-runewidth v0.0.10/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= +github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= +github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= +github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -125,8 +155,6 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= -github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -160,6 +188,7 @@ github.com/prometheus/client_golang v1.7.1 h1:NTGy1Ja9pByO+xAeH/qiWnLrKtr3hJPNja github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.2.0 h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M= github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= @@ -171,6 +200,7 @@ github.com/prometheus/procfs v0.1.3 h1:F0+tqvhOksq22sc6iCHF5WGlWjdwj92p0udFh1VFB github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/rivo/uniseg v0.1.0 h1:+2KBaVoUmb9XzDsrx/Ct0W/EYOSFf/nWTauy++DprtY= github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rs/cors v1.8.0 h1:P2KMzcFwrPoSjkF1WLRPsp3UMLyql8L4v9hQpVeK5so= github.com/rs/cors v1.8.0/go.mod h1:EBwu+T5AvHOcXwvZIkQFjUN6s8Czyqw12GL/Y0tUyRM= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= @@ -189,6 +219,7 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= @@ -204,11 +235,13 @@ github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLY github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/yuin/goldmark v1.4.13 h1:fVcFKWvrslecOb/tg+Cc05dkeYx540o0FuFt3nUVDoE= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= +github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= +github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= go.opentelemetry.io/otel v0.16.0 h1:uIWEbdeb4vpKPGITLsRVUS44L5oDbDUCZxn8lkxhmgw= go.opentelemetry.io/otel v0.16.0/go.mod h1:e4GKElweB8W2gWUqbghw0B8t5MCTccc9212eNHnOHwA= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= -go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= -go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= @@ -219,11 +252,21 @@ golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnf golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b h1:SCE/18RnFsLrjydh/R/s5EVvHoZprqEQUuoxK8q2Pc4= golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 h1:6zppjxzCulZykYSLyVDYbneBfbaBIQPYMevg0bEwv2s= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -232,13 +275,15 @@ golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20220722155237-a158d28d115b h1:PxfKdU9lEEDYjdIzOtC4qFWgkU2rGHdKlKowJSMN9h0= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw= -golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde h1:ejfdSekXMDxDLbRrJMwUk6KnSLZ2McaUCVcIKM+N6jc= +golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -251,14 +296,14 @@ golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210616045830-e2b7044e8c71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s= -golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 h1:v6hYoSR9T5oet+pMXwUWkbiVqx/63mlHjefrHmxwfeY= +golang.org/x/sys v0.0.0-20220829200755-d48e67d00261/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -267,12 +312,30 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f h1:uF6paiQQebLeSXkrTqHqz0MXhXXS1KgF41eUdBNvxK0= +golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= +gonum.org/v1/gonum v0.11.0 h1:f1IJhK4Km5tBJmaiJXtk/PkL4cdVX6J+tGiM187uT5E= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.49.0 h1:WTLtQzmQori5FUH25Pq4WT22oCsv8USpQ+F6rqtsmxw= +google.golang.org/grpc v1.49.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -280,13 +343,15 @@ google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miE google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.25.1-0.20201020201750-d3470999428b h1:jEdfCm+8YTWSYgU4L7Nq0jjU+q9RxIhi0cXLTY+Ih3A= -google.golang.org/protobuf v1.25.1-0.20201020201750-d3470999428b/go.mod h1:hFxJC2f0epmp1elRCiEGJTKAWbwxZ2nvqZdHl3FQXCY= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= +google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b h1:QRR6H1YWRnHb4Y/HeNFCTJLFVxaq6wH4YuVdsUOr75U= -gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE= gopkg.in/go-playground/validator.v9 v9.29.1/go.mod h1:+c9/zcJMFNgbLvly1L1V+PpxWdVbfP1avr/N00E2vyQ= @@ -306,3 +371,5 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/service/ztests/curl-load-arrows.yaml b/service/ztests/curl-load-arrows.yaml new file mode 100644 index 0000000000..5d727cbc71 --- /dev/null +++ b/service/ztests/curl-load-arrows.yaml @@ -0,0 +1,21 @@ +script: | + source service.sh + zed create -q test + zq -f arrows in.zson | + curl -H Content-Type:application/vnd.apache.arrow.stream --data-binary @- \ + --fail $ZED_LAKE/pool/test/branch/main | zq -z commit:=0 - + echo // + zed query -z 'from test' + +inputs: + - name: in.zson + data: | + {x:1} + - name: service.sh + +outputs: + - name: stdout + data: | + {commit:0,warnings:[]([string])} + // + {x:1} diff --git a/service/ztests/curl-load-error.yaml b/service/ztests/curl-load-error.yaml index 4ba0953803..d125fdea7d 100644 --- a/service/ztests/curl-load-error.yaml +++ b/service/ztests/curl-load-error.yaml @@ -16,7 +16,7 @@ inputs: outputs: - name: stdout data: | - {"type":"Error","kind":"invalid operation","error":"format detection error\n\tzeek: line 1: bad types/fields definition in zeek header\n\tzjson: line 1: invalid character 'T' looking for beginning of value\n\tzson: ZSON syntax error\n\tzng: malformed zng record\n\tcsv: line 1: EOF\n\tjson: invalid character 'T' looking for beginning of value\n\tparquet: auto-detection not supported\n\tvng: auto-detection not supported\n\tline: auto-detection not supported"} + {"type":"Error","kind":"invalid operation","error":"format detection error\n\tarrows: schema message length exceeds 1 MiB\n\tzeek: line 1: bad types/fields definition in zeek header\n\tzjson: line 1: invalid character 'T' looking for beginning of value\n\tzson: ZSON syntax error\n\tzng: malformed zng record\n\tcsv: line 1: EOF\n\tjson: invalid character 'T' looking for beginning of value\n\tparquet: auto-detection not supported\n\tvng: auto-detection not supported\n\tline: auto-detection not supported"} code 400 {"type":"Error","kind":"invalid operation","error":"unsupported MIME type: unsupported"} code 400 diff --git a/service/ztests/curl-query.yaml b/service/ztests/curl-query.yaml index ded07c627d..1d4632b97d 100644 --- a/service/ztests/curl-query.yaml +++ b/service/ztests/curl-query.yaml @@ -7,6 +7,9 @@ script: | curl -H "Accept: $accept" -d '{"query":"from test"}' $ZED_LAKE/query\?ctrl=true | sed -E '/QueryStats/s/[0-9]{3,}/xxx/g' # for ZJSON done + echo === application/vnd.apache.arrow.stream === + curl -H 'Accept: application/vnd.apache.arrow.stream' -d '{"query":"from test"}' $ZED_LAKE/query | + zq -z -i arrows - echo === application/x-parquet === curl -H 'Accept: application/x-parquet' -d '{"query":"from test"}' -o out.parquet $ZED_LAKE/query zq -z -i parquet out.parquet @@ -51,6 +54,9 @@ outputs: === === {a:"hello",b:{c:"world",d:"goodbye"}} {a:"one",b:{c:"two",d:"three"}} + === application/vnd.apache.arrow.stream === + {a:"hello",b:{c:"world",d:"goodbye"}} + {a:"one",b:{c:"two",d:"three"}} === application/x-parquet === {a:"hello",b:{c:"world",d:"goodbye"}} {a:"one",b:{c:"two",d:"three"}} diff --git a/service/ztests/load-garbage.yaml b/service/ztests/load-garbage.yaml index 42df70ec4b..f49da7ba50 100644 --- a/service/ztests/load-garbage.yaml +++ b/service/ztests/load-garbage.yaml @@ -13,6 +13,7 @@ outputs: - name: stderr data: | stdio:stdin: format detection error + arrows: schema message length exceeds 1 MiB zeek: line 1: bad types/fields definition in zeek header zjson: line 1: invalid character 'T' looking for beginning of value zson: ZSON syntax error diff --git a/zed_test.go b/zed_test.go index 70811d0ad3..b3d30216bd 100644 --- a/zed_test.go +++ b/zed_test.go @@ -16,6 +16,7 @@ import ( "github.com/brimdata/zed/runtime/op" "github.com/brimdata/zed/zio" "github.com/brimdata/zed/zio/anyio" + "github.com/brimdata/zed/zio/arrowio" "github.com/brimdata/zed/zio/parquetio" "github.com/brimdata/zed/zio/zngio" "github.com/brimdata/zed/zson" @@ -34,6 +35,7 @@ func TestZed(t *testing.T) { t.Parallel() data, err := loadZTestInputsAndOutputs(dirs) require.NoError(t, err) + runAllBoomerangs(t, "arrows", data) runAllBoomerangs(t, "parquet", data) runAllBoomerangs(t, "zson", data) }) @@ -151,7 +153,10 @@ func runOneBoomerang(t *testing.T, format, data string) { require.NoError(t, baselineWriter.Close()) } if err != nil { - if errors.Is(err, parquetio.ErrEmptyRecordType) || + if errors.Is(err, arrowio.ErrMultipleTypes) || + errors.Is(err, arrowio.ErrNotRecord) || + errors.Is(err, arrowio.ErrUnsupportedType) || + errors.Is(err, parquetio.ErrEmptyRecordType) || errors.Is(err, parquetio.ErrNullType) || errors.Is(err, parquetio.ErrUnionType) || strings.Contains(err.Error(), "Parquet output encountered non-record value") || diff --git a/zio/anyio/lookup.go b/zio/anyio/lookup.go index b9ceb94c09..5dbd4c702a 100644 --- a/zio/anyio/lookup.go +++ b/zio/anyio/lookup.go @@ -6,6 +6,7 @@ import ( "github.com/brimdata/zed" "github.com/brimdata/zed/zio" + "github.com/brimdata/zed/zio/arrowio" "github.com/brimdata/zed/zio/csvio" "github.com/brimdata/zed/zio/jsonio" "github.com/brimdata/zed/zio/lineio" @@ -19,6 +20,8 @@ import ( func lookupReader(zctx *zed.Context, r io.Reader, opts ReaderOpts) (zio.ReadCloser, error) { switch opts.Format { + case "arrows": + return arrowio.NewReader(zctx, r) case "csv": return zio.NopReadCloser(csvio.NewReader(zctx, r)), nil case "line": diff --git a/zio/anyio/reader.go b/zio/anyio/reader.go index 004b7a06fe..57be97fbb5 100644 --- a/zio/anyio/reader.go +++ b/zio/anyio/reader.go @@ -2,6 +2,7 @@ package anyio import ( "bufio" + "encoding/binary" "errors" "fmt" "io" @@ -10,6 +11,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/brimdata/zed" "github.com/brimdata/zed/zio" + "github.com/brimdata/zed/zio/arrowio" "github.com/brimdata/zed/zio/csvio" "github.com/brimdata/zed/zio/jsonio" "github.com/brimdata/zed/zio/zeekio" @@ -35,6 +37,13 @@ func NewReaderWithOpts(zctx *zed.Context, r io.Reader, opts ReaderOpts) (zio.Rea recorder := NewRecorder(r) track := NewTrack(recorder) + arrowsErr := isArrowStream(track) + if arrowsErr == nil { + return arrowio.NewReader(zctx, recorder) + } + arrowsErr = fmt.Errorf("arrows: %w", arrowsErr) + track.Reset() + zeekErr := match(zeekio.NewReader(zed.NewContext(), track), "zeek", 1) if zeekErr == nil { return zio.NopReadCloser(zeekio.NewReader(zctx, recorder)), nil @@ -94,7 +103,37 @@ func NewReaderWithOpts(zctx *zed.Context, r io.Reader, opts ReaderOpts) (zio.Rea parquetErr := errors.New("parquet: auto-detection not supported") vngErr := errors.New("vng: auto-detection not supported") lineErr := errors.New("line: auto-detection not supported") - return nil, joinErrs([]error{zeekErr, zjsonErr, zsonErr, zngErr, csvErr, jsonErr, parquetErr, vngErr, lineErr}) + return nil, joinErrs([]error{arrowsErr, zeekErr, zjsonErr, zsonErr, zngErr, csvErr, jsonErr, parquetErr, vngErr, lineErr}) +} + +func isArrowStream(track *Track) error { + // Streams created by Arrow 0.15.0 or later begin with a 4-byte + // continuation indicator (0xffffffff) followed by a 4-byte + // little-endian schema message length. Older streams begin with the + // length. + buf := make([]byte, 4) + if _, err := io.ReadFull(track, buf); err != nil { + return err + } + if string(buf) == "\xff\xff\xff\xff" { + // This looks like a continuation indicator. Skip it. + if _, err := io.ReadFull(track, buf); err != nil { + return err + } + } + if binary.LittleEndian.Uint32(buf) > 1048576 { + // Prevent arrowio.NewReader from attempting to read an + // unreasonable amount. + return errors.New("schema message length exceeds 1 MiB") + } + track.Reset() + zrc, err := arrowio.NewReader(zed.NewContext(), track) + if err != nil { + return err + } + defer zrc.Close() + _, err = zrc.Read() + return err } func joinErrs(errs []error) error { diff --git a/zio/anyio/writer.go b/zio/anyio/writer.go index 7106dcafd4..6ab94eecb0 100644 --- a/zio/anyio/writer.go +++ b/zio/anyio/writer.go @@ -6,6 +6,7 @@ import ( "github.com/brimdata/zed" "github.com/brimdata/zed/zio" + "github.com/brimdata/zed/zio/arrowio" "github.com/brimdata/zed/zio/csvio" "github.com/brimdata/zed/zio/jsonio" "github.com/brimdata/zed/zio/lakeio" @@ -29,6 +30,8 @@ type WriterOpts struct { func NewWriter(w io.WriteCloser, opts WriterOpts) (zio.WriteCloser, error) { switch opts.Format { + case "arrows": + return arrowio.NewWriter(w), nil case "null": return &nullWriter{}, nil case "zng": diff --git a/zio/anyio/ztests/arrows.yaml b/zio/anyio/ztests/arrows.yaml new file mode 100644 index 0000000000..6b1f1fcde9 --- /dev/null +++ b/zio/anyio/ztests/arrows.yaml @@ -0,0 +1,11 @@ +script: | + zq -f arrows - | zq -z - + +inputs: + - name: stdin + data: &stdin | + {x:1} + +outputs: + - name: stdout + data: *stdin diff --git a/zio/arrowio/reader.go b/zio/arrowio/reader.go new file mode 100644 index 0000000000..93fe211b9f --- /dev/null +++ b/zio/arrowio/reader.go @@ -0,0 +1,467 @@ +package arrowio + +import ( + "fmt" + "io" + "strconv" + "unsafe" + + "github.com/apache/arrow/go/v11/arrow" + "github.com/apache/arrow/go/v11/arrow/array" + "github.com/apache/arrow/go/v11/arrow/ipc" + "github.com/apache/arrow/go/v11/parquet/pqarrow" + "github.com/brimdata/zed" + "github.com/brimdata/zed/pkg/nano" + "github.com/brimdata/zed/zcode" + "golang.org/x/exp/slices" +) + +// Reader is a zio.Reader for the Arrow IPC stream format. +type Reader struct { + zctx *zed.Context + rr pqarrow.RecordReader + + typ zed.Type + unionTagMappings map[string][]int + + rec arrow.Record + i int + + builder zcode.Builder + val zed.Value +} + +func NewReader(zctx *zed.Context, r io.Reader) (*Reader, error) { + ipcReader, err := ipc.NewReader(r) + if err != nil { + return nil, err + } + ar, err := NewReaderFromRecordReader(zctx, ipcReader) + if err != nil { + ipcReader.Release() + return nil, err + } + return ar, nil +} + +func NewReaderFromRecordReader(zctx *zed.Context, rr pqarrow.RecordReader) (*Reader, error) { + fields := slices.Clone(rr.Schema().Fields()) + uniquifyFieldNames(fields) + r := &Reader{ + zctx: zctx, + rr: rr, + unionTagMappings: map[string][]int{}, + } + typ, err := r.newZedType(arrow.StructOf(fields...)) + if err != nil { + return nil, err + } + r.typ = typ + return r, nil +} + +func uniquifyFieldNames(fields []arrow.Field) { + names := map[string]int{} + for i, f := range fields { + if n := names[f.Name]; n > 0 { + fields[i].Name += strconv.Itoa(n) + } + names[f.Name]++ + } +} + +func (r *Reader) Close() error { + if r.rr != nil { + r.rr.Release() + r.rr = nil + } + if r.rec != nil { + r.rec.Release() + r.rec = nil + } + return nil +} + +func (r *Reader) Read() (*zed.Value, error) { + for r.rec == nil { + rec, err := r.rr.Read() + if err != nil { + if err == io.EOF { + return nil, nil + } + return nil, err + } + if rec.NumRows() > 0 { + r.rec = rec + r.i = 0 + } else { + rec.Release() + } + } + r.builder.Truncate() + for _, array := range r.rec.Columns() { + if err := r.buildZcode(array, r.i); err != nil { + return nil, err + } + } + r.val = *zed.NewValue(r.typ, r.builder.Bytes()) + r.i++ + if r.i >= int(r.rec.NumRows()) { + r.rec.Release() + r.rec = nil + } + return &r.val, nil +} + +var dayTimeIntervalFields = []zed.Column{ + {Name: "days", Type: zed.TypeInt32}, + {Name: "milliseconds", Type: zed.TypeUint32}, +} +var decimal128Fields = []zed.Column{ + {Name: "high", Type: zed.TypeInt64}, + {Name: "low", Type: zed.TypeUint64}, +} +var monthDayNanoIntervalFields = []zed.Column{ + {Name: "month", Type: zed.TypeInt32}, + {Name: "day", Type: zed.TypeInt32}, + {Name: "nanoseconds", Type: zed.TypeInt64}, +} + +func (r *Reader) newZedType(dt arrow.DataType) (zed.Type, error) { + // Order here follows that of the arrow.Time constants. + switch dt.ID() { + case arrow.NULL: + return zed.TypeNull, nil + case arrow.BOOL: + return zed.TypeBool, nil + case arrow.UINT8: + return zed.TypeUint8, nil + case arrow.INT8: + return zed.TypeInt8, nil + case arrow.UINT16: + return zed.TypeUint16, nil + case arrow.INT16: + return zed.TypeInt16, nil + case arrow.UINT32: + return zed.TypeUint32, nil + case arrow.INT32: + return zed.TypeInt32, nil + case arrow.UINT64: + return zed.TypeUint64, nil + case arrow.INT64: + return zed.TypeInt64, nil + case arrow.FLOAT16: + return r.zctx.LookupTypeNamed("arrow_float16", zed.TypeFloat32) + case arrow.FLOAT32: + return zed.TypeFloat32, nil + case arrow.FLOAT64: + return zed.TypeFloat64, nil + case arrow.STRING: + return zed.TypeString, nil + case arrow.BINARY: + return zed.TypeBytes, nil + case arrow.FIXED_SIZE_BINARY: + width := strconv.Itoa(dt.(*arrow.FixedSizeBinaryType).ByteWidth) + return r.zctx.LookupTypeNamed("arrow_fixed_size_binary_"+width, zed.TypeBytes) + case arrow.DATE32: + return r.zctx.LookupTypeNamed("arrow_date32", zed.TypeTime) + case arrow.DATE64: + return r.zctx.LookupTypeNamed("arrow_date64", zed.TypeTime) + case arrow.TIMESTAMP: + if unit := dt.(*arrow.TimestampType).Unit; unit != arrow.Nanosecond { + return r.zctx.LookupTypeNamed("arrow_timestamp_"+unit.String(), zed.TypeTime) + } + return zed.TypeTime, nil + case arrow.TIME32: + unit := dt.(*arrow.Time32Type).Unit.String() + return r.zctx.LookupTypeNamed("arrow_time32_"+unit, zed.TypeTime) + case arrow.TIME64: + unit := dt.(*arrow.Time64Type).Unit.String() + return r.zctx.LookupTypeNamed("arrow_time64_"+unit, zed.TypeTime) + case arrow.INTERVAL_MONTHS: + return r.zctx.LookupTypeNamed("arrow_month_interval", zed.TypeInt32) + case arrow.INTERVAL_DAY_TIME: + typ, err := r.zctx.LookupTypeRecord(dayTimeIntervalFields) + if err != nil { + return nil, err + } + return r.zctx.LookupTypeNamed("arrow_day_time_interval", typ) + case arrow.DECIMAL128: + typ, err := r.zctx.LookupTypeRecord(decimal128Fields) + if err != nil { + return nil, err + } + return r.zctx.LookupTypeNamed("arrow_decimal128", typ) + case arrow.DECIMAL256: + return r.zctx.LookupTypeNamed("arrow_decimal256", r.zctx.LookupTypeArray(zed.TypeUint64)) + case arrow.LIST: + typ, err := r.newZedType(dt.(*arrow.ListType).Elem()) + if err != nil { + return nil, err + } + return r.zctx.LookupTypeArray(typ), nil + case arrow.STRUCT: + var fields []zed.Column + for _, f := range dt.(*arrow.StructType).Fields() { + typ, err := r.newZedType(f.Type) + if err != nil { + return nil, err + } + fields = append(fields, zed.NewColumn(f.Name, typ)) + } + return r.zctx.LookupTypeRecord(fields) + case arrow.SPARSE_UNION, arrow.DENSE_UNION: + return r.newZedUnionType(dt.(arrow.UnionType), dt.Fingerprint()) + case arrow.DICTIONARY: + return r.newZedType(dt.(*arrow.DictionaryType).ValueType) + case arrow.MAP: + keyType, err := r.newZedType(dt.(*arrow.MapType).KeyType()) + if err != nil { + return nil, err + } + itemType, err := r.newZedType(dt.(*arrow.MapType).ItemType()) + if err != nil { + return nil, err + } + return r.zctx.LookupTypeMap(keyType, itemType), nil + case arrow.FIXED_SIZE_LIST: + typ, err := r.newZedType(dt.(*arrow.FixedSizeListType).Elem()) + if err != nil { + return nil, err + } + size := strconv.Itoa(int(dt.(*arrow.FixedSizeListType).Len())) + return r.zctx.LookupTypeNamed("arrow_fixed_size_list_"+size, r.zctx.LookupTypeArray(typ)) + case arrow.DURATION: + if unit := dt.(*arrow.DurationType).Unit; unit != arrow.Nanosecond { + return r.zctx.LookupTypeNamed("arrow_duration_"+unit.String(), zed.TypeDuration) + } + return zed.TypeDuration, nil + case arrow.LARGE_STRING: + return r.zctx.LookupTypeNamed("arrow_large_string", zed.TypeString) + case arrow.LARGE_BINARY: + return r.zctx.LookupTypeNamed("arrow_large_binary", zed.TypeBytes) + case arrow.LARGE_LIST: + typ, err := r.newZedType(dt.(*arrow.LargeListType).Elem()) + if err != nil { + return nil, err + } + return r.zctx.LookupTypeNamed("arrow_large_list", r.zctx.LookupTypeArray(typ)) + case arrow.INTERVAL_MONTH_DAY_NANO: + typ, err := r.zctx.LookupTypeRecord(monthDayNanoIntervalFields) + if err != nil { + return nil, err + } + return r.zctx.LookupTypeNamed("arrow_month_day_nano_interval", typ) + default: + return nil, fmt.Errorf("unimplemented Arrow type: %s", dt.Name()) + } +} + +func (r *Reader) newZedUnionType(union arrow.UnionType, fingerprint string) (zed.Type, error) { + var types []zed.Type + for _, f := range union.Fields() { + typ, err := r.newZedType(f.Type) + if err != nil { + return nil, err + } + types = append(types, typ) + } + uniqueTypes := zed.UniqueTypes(slices.Clone(types)) + var x []int +Loop: + for _, typ2 := range types { + for i, typ := range uniqueTypes { + if typ == typ2 { + x = append(x, i) + continue Loop + } + } + } + r.unionTagMappings[fingerprint] = x + return r.zctx.LookupTypeUnion(uniqueTypes), nil +} + +func (r *Reader) buildZcode(a arrow.Array, i int) error { + b := &r.builder + if a.IsNull(i) { + b.Append(nil) + return nil + } + data := a.Data() + // XXX Calling array.New*Data once per value (rather than once + // per arrow.Array) is slow. + // + // Order here follows that of the arrow.Time constants. + switch a.DataType().ID() { + case arrow.NULL: + b.Append(nil) + case arrow.BOOL: + b.Append(zed.EncodeBool(array.NewBooleanData(data).Value(i))) + case arrow.UINT8: + b.Append(zed.EncodeUint(uint64(array.NewUint8Data(data).Value(i)))) + case arrow.INT8: + b.Append(zed.EncodeInt(int64(array.NewInt8Data(data).Value(i)))) + case arrow.UINT16: + b.Append(zed.EncodeUint(uint64(array.NewUint16Data(data).Value(i)))) + case arrow.INT16: + b.Append(zed.EncodeInt(int64(array.NewInt16Data(data).Value(i)))) + case arrow.UINT32: + b.Append(zed.EncodeUint(uint64(array.NewUint32Data(data).Value(i)))) + case arrow.INT32: + b.Append(zed.EncodeInt(int64(array.NewInt32Data(data).Value(i)))) + case arrow.UINT64: + b.Append(zed.EncodeUint(array.NewUint64Data(data).Value(i))) + case arrow.INT64: + b.Append(zed.EncodeInt(array.NewInt64Data(data).Value(i))) + case arrow.FLOAT16: + b.Append(zed.EncodeFloat32(array.NewFloat16Data(data).Value(i).Float32())) + case arrow.FLOAT32: + b.Append(zed.EncodeFloat32(array.NewFloat32Data(data).Value(i))) + case arrow.FLOAT64: + b.Append(zed.EncodeFloat64(array.NewFloat64Data(data).Value(i))) + case arrow.STRING: + appendString(b, array.NewStringData(data).Value(i)) + case arrow.BINARY: + b.Append(zed.EncodeBytes(array.NewBinaryData(data).Value(i))) + case arrow.FIXED_SIZE_BINARY: + b.Append(zed.EncodeBytes(array.NewFixedSizeBinaryData(data).Value(i))) + case arrow.DATE32: + b.Append(zed.EncodeTime(nano.TimeToTs(array.NewDate32Data(data).Value(i).ToTime()))) + case arrow.DATE64: + b.Append(zed.EncodeTime(nano.TimeToTs(array.NewDate64Data(data).Value(i).ToTime()))) + case arrow.TIMESTAMP: + unit := a.DataType().(*arrow.TimestampType).Unit + b.Append(zed.EncodeTime(nano.TimeToTs(array.NewTimestampData(data).Value(i).ToTime(unit)))) + case arrow.TIME32: + unit := a.DataType().(*arrow.Time32Type).Unit + b.Append(zed.EncodeTime(nano.TimeToTs(array.NewTime32Data(data).Value(i).ToTime(unit)))) + case arrow.TIME64: + unit := a.DataType().(*arrow.Time64Type).Unit + b.Append(zed.EncodeTime(nano.TimeToTs(array.NewTime64Data(data).Value(i).ToTime(unit)))) + case arrow.INTERVAL_MONTHS: + b.Append(zed.EncodeInt(int64(array.NewMonthIntervalData(data).Value(i)))) + case arrow.INTERVAL_DAY_TIME: + v := array.NewDayTimeIntervalData(data).Value(i) + b.BeginContainer() + b.Append(zed.EncodeInt(int64(v.Days))) + b.Append(zed.EncodeInt(int64(v.Milliseconds))) + b.EndContainer() + case arrow.DECIMAL128: + v := array.NewDecimal128Data(data).Value(i) + b.BeginContainer() + b.Append(zed.EncodeInt(v.HighBits())) + b.Append(zed.EncodeUint(v.LowBits())) + b.EndContainer() + case arrow.DECIMAL256: + b.BeginContainer() + for _, u := range array.NewDecimal256Data(data).Value(i).Array() { + b.Append(zed.EncodeUint(u)) + } + b.EndContainer() + case arrow.LIST: + v := array.NewListData(data) + start, end := v.ValueOffsets(i) + return r.buildZcodeList(v.ListValues(), int(start), int(end)) + case arrow.STRUCT: + v := array.NewStructData(data) + b.BeginContainer() + for j := 0; j < v.NumField(); j++ { + if err := r.buildZcode(v.Field(j), i); err != nil { + return err + } + } + b.EndContainer() + case arrow.SPARSE_UNION: + return r.buildZcodeUnion(array.NewSparseUnionData(data), data.DataType(), i) + case arrow.DENSE_UNION: + return r.buildZcodeUnion(array.NewDenseUnionData(data), data.DataType(), i) + case arrow.DICTIONARY: + v := array.NewDictionaryData(data) + return r.buildZcode(v.Dictionary(), v.GetValueIndex(i)) + case arrow.MAP: + v := array.NewMapData(data) + keys, items := v.Keys(), v.Items() + b.BeginContainer() + for j, end := v.ValueOffsets(i); j < end; j++ { + if err := r.buildZcode(keys, int(j)); err != nil { + return err + } + if err := r.buildZcode(items, int(j)); err != nil { + return err + } + } + b.TransformContainer(zed.NormalizeMap) + b.EndContainer() + case arrow.FIXED_SIZE_LIST: + v := array.NewFixedSizeListData(data) + return r.buildZcodeList(v.ListValues(), 0, v.Len()) + case arrow.DURATION: + d := nano.Duration(array.NewDurationData(data).Value(i)) + switch a.DataType().(*arrow.DurationType).Unit { + case arrow.Second: + d *= nano.Second + case arrow.Millisecond: + d *= nano.Millisecond + case arrow.Microsecond: + d *= nano.Microsecond + } + b.Append(zed.EncodeDuration(d)) + case arrow.LARGE_STRING: + appendString(b, array.NewLargeStringData(data).Value(i)) + case arrow.LARGE_BINARY: + b.Append(zed.EncodeBytes(array.NewLargeBinaryData(data).Value(i))) + case arrow.LARGE_LIST: + v := array.NewLargeListData(data) + start, end := v.ValueOffsets(i) + return r.buildZcodeList(v.ListValues(), int(start), int(end)) + case arrow.INTERVAL_MONTH_DAY_NANO: + v := array.NewMonthDayNanoIntervalData(data).Value(i) + b.BeginContainer() + b.Append(zed.EncodeInt(int64(v.Months))) + b.Append(zed.EncodeInt(int64(v.Days))) + b.Append(zed.EncodeInt(int64(v.Nanoseconds))) + b.EndContainer() + default: + return fmt.Errorf("unimplemented Arrow type: %s", a.DataType().Name()) + } + return nil +} + +func (r *Reader) buildZcodeList(a arrow.Array, start, end int) error { + r.builder.BeginContainer() + for i := start; i < end; i++ { + if err := r.buildZcode(a, i); err != nil { + return err + } + } + r.builder.EndContainer() + return nil +} + +func (r *Reader) buildZcodeUnion(u array.Union, dt arrow.DataType, i int) error { + childID := u.ChildID(i) + if u, ok := u.(*array.DenseUnion); ok { + i = int(u.ValueOffset(i)) + } + b := &r.builder + if field := u.Field(childID); field.IsNull(i) { + b.Append(nil) + } else { + b.BeginContainer() + b.Append(zed.EncodeInt(int64(r.unionTagMappings[dt.Fingerprint()][childID]))) + if err := r.buildZcode(field, i); err != nil { + return err + } + b.EndContainer() + } + return nil +} + +func appendString(b *zcode.Builder, s string) { + if s == "" { + b.Append(zed.EncodeString(s)) + } else { + // Avoid a call to runtime.stringtoslicebyte. + b.Append(*(*[]byte)(unsafe.Pointer(&s))) + } +} diff --git a/zio/arrowio/writer.go b/zio/arrowio/writer.go new file mode 100644 index 0000000000..cc66c2af40 --- /dev/null +++ b/zio/arrowio/writer.go @@ -0,0 +1,471 @@ +package arrowio + +import ( + "encoding/hex" + "errors" + "fmt" + "io" + "math" + "strconv" + "strings" + + "github.com/apache/arrow/go/v11/arrow" + "github.com/apache/arrow/go/v11/arrow/array" + "github.com/apache/arrow/go/v11/arrow/decimal128" + "github.com/apache/arrow/go/v11/arrow/decimal256" + "github.com/apache/arrow/go/v11/arrow/float16" + "github.com/apache/arrow/go/v11/arrow/ipc" + "github.com/apache/arrow/go/v11/arrow/memory" + "github.com/brimdata/zed" + "github.com/brimdata/zed/pkg/nano" + "github.com/brimdata/zed/zcode" + "github.com/brimdata/zed/zson" + "golang.org/x/exp/slices" +) + +var ( + ErrMultipleTypes = errors.New("arrowio: encountered multiple types (consider 'fuse')") + ErrNotRecord = errors.New("arrowio: not a record") + ErrUnsupportedType = errors.New("arrowio: unsupported type") +) + +// Writer is a zio.Writer for the Arrow IPC stream format. Given Zed values +// with appropriately named types (see the newArrowDataType implementation), it +// can write all Arrow types except dictionaries and sparse unions. (Although +// dictionaries are not part of the Zed data model, write support could be added +// using a named type.) +type Writer struct { + w io.WriteCloser + writer *ipc.Writer + builder *array.RecordBuilder + unionTagMappings map[zed.Type][]int + typ *zed.TypeRecord +} + +func NewWriter(w io.WriteCloser) *Writer { + return &Writer{w: w, unionTagMappings: map[zed.Type][]int{}} +} + +func (w *Writer) Close() error { + var err error + if w.writer != nil { + err = w.flush(1) + w.builder.Release() + if err2 := w.writer.Close(); err == nil { + err = err2 + } + w.writer = nil + } + if err2 := w.w.Close(); err == nil { + err = err2 + } + return err +} + +const recordBatchSize = 1024 + +func (w *Writer) Write(val *zed.Value) error { + recType, ok := zed.TypeUnder(val.Type).(*zed.TypeRecord) + if !ok { + return fmt.Errorf("%w: %s", ErrNotRecord, zson.MustFormatValue(val)) + } + if w.typ == nil { + w.typ = recType + dt, err := w.newArrowDataType(recType) + if err != nil { + return err + } + schema := arrow.NewSchema(dt.(*arrow.StructType).Fields(), nil) + w.builder = array.NewRecordBuilder(memory.DefaultAllocator, schema) + w.builder.Reserve(recordBatchSize) + w.writer = ipc.NewWriter(w.w, ipc.WithSchema(schema)) + } else if w.typ != recType { + return fmt.Errorf("%w: %s and %s", ErrMultipleTypes, zson.FormatType(w.typ), zson.FormatType(recType)) + } + it := val.Bytes.Iter() + for i, builder := range w.builder.Fields() { + var b zcode.Bytes + if it != nil { + b = it.Next() + } + w.buildArrowValue(builder, recType.Columns[i].Type, b) + } + return w.flush(recordBatchSize) +} + +func (w *Writer) flush(min int) error { + if w.builder.Field(0).Len() < min { + return nil + } + rec := w.builder.NewRecord() + defer rec.Release() + w.builder.Reserve(recordBatchSize) + return w.writer.Write(rec) +} + +func (w *Writer) newArrowDataType(typ zed.Type) (arrow.DataType, error) { + var name string + if n, ok := typ.(*zed.TypeNamed); ok { + name = n.Name + typ = zed.TypeUnder(n.Type) + } + // Order here follows that of the zed.ID* and zed.TypeValue* constants. + switch typ := typ.(type) { + case *zed.TypeOfUint8: + return arrow.PrimitiveTypes.Uint8, nil + case *zed.TypeOfUint16: + return arrow.PrimitiveTypes.Uint16, nil + case *zed.TypeOfUint32: + return arrow.PrimitiveTypes.Uint32, nil + case *zed.TypeOfUint64: + return arrow.PrimitiveTypes.Uint64, nil + case *zed.TypeOfInt8: + return arrow.PrimitiveTypes.Int8, nil + case *zed.TypeOfInt16: + return arrow.PrimitiveTypes.Int16, nil + case *zed.TypeOfInt32: + if name == "arrow_month_interval" { + return arrow.FixedWidthTypes.MonthInterval, nil + } + return arrow.PrimitiveTypes.Int32, nil + case *zed.TypeOfInt64: + return arrow.PrimitiveTypes.Int64, nil + case *zed.TypeOfDuration: + switch name { + case "arrow_duration_s": + return arrow.FixedWidthTypes.Duration_s, nil + case "arrow_duration_ms": + return arrow.FixedWidthTypes.Duration_ms, nil + case "arrow_duration_us": + return arrow.FixedWidthTypes.Duration_us, nil + case "arrow_day_time_interval": + return arrow.FixedWidthTypes.DayTimeInterval, nil + } + return arrow.FixedWidthTypes.Duration_ns, nil + case *zed.TypeOfTime: + switch name { + case "arrow_date32": + return arrow.FixedWidthTypes.Date32, nil + case "arrow_date64": + return arrow.FixedWidthTypes.Date64, nil + case "arrow_timestamp_s": + return arrow.FixedWidthTypes.Timestamp_s, nil + case "arrow_timestamp_ms": + return arrow.FixedWidthTypes.Timestamp_ms, nil + case "arrow_timestamp_us": + return arrow.FixedWidthTypes.Timestamp_us, nil + case "arrow_time32_s": + return arrow.FixedWidthTypes.Time32s, nil + case "arrow_time32_ms": + return arrow.FixedWidthTypes.Time32ms, nil + case "arrow_time64_us": + return arrow.FixedWidthTypes.Time64us, nil + case "arrow_time64_ns": + return arrow.FixedWidthTypes.Time64ns, nil + } + return arrow.FixedWidthTypes.Timestamp_ns, nil + case *zed.TypeOfFloat32: + if name == "arrow_float16" { + return arrow.FixedWidthTypes.Float16, nil + } + return arrow.PrimitiveTypes.Float32, nil + case *zed.TypeOfFloat64: + return arrow.PrimitiveTypes.Float64, nil + case *zed.TypeOfBool: + return arrow.FixedWidthTypes.Boolean, nil + case *zed.TypeOfBytes: + const prefix = "arrow_fixed_size_binary_" + switch { + case strings.HasPrefix(name, prefix): + if width, err := strconv.Atoi(strings.TrimPrefix(name, prefix)); err == nil { + return &arrow.FixedSizeBinaryType{ByteWidth: width}, nil + } + case name == "arrow_large_binary": + return arrow.BinaryTypes.LargeBinary, nil + } + return arrow.BinaryTypes.Binary, nil + case *zed.TypeOfString: + if name == "arrow_large_string" { + return arrow.BinaryTypes.LargeString, nil + } + return arrow.BinaryTypes.String, nil + case *zed.TypeOfIP, *zed.TypeOfNet, *zed.TypeOfType: + return arrow.BinaryTypes.String, nil + case *zed.TypeOfNull: + return arrow.Null, nil + case *zed.TypeRecord: + if len(typ.Columns) == 0 { + return nil, fmt.Errorf("%w: empty record", ErrUnsupportedType) + } + switch name { + case "arrow_day_time_interval": + if fieldsEqual(typ.Columns, dayTimeIntervalFields) { + return arrow.FixedWidthTypes.DayTimeInterval, nil + } + case "arrow_decimal128": + if fieldsEqual(typ.Columns, decimal128Fields) { + return &arrow.Decimal128Type{}, nil + } + case "arrow_month_day_nano_interval": + if fieldsEqual(typ.Columns, monthDayNanoIntervalFields) { + return arrow.FixedWidthTypes.MonthDayNanoInterval, nil + } + } + var fields []arrow.Field + for _, field := range typ.Columns { + dt, err := w.newArrowDataType(field.Type) + if err != nil { + return nil, err + } + fields = append(fields, arrow.Field{ + Name: field.Name, + Type: dt, + Nullable: true, + }) + } + return arrow.StructOf(fields...), nil + case *zed.TypeArray, *zed.TypeSet: + dt, err := w.newArrowDataType(zed.InnerType(typ)) + if err != nil { + return nil, err + } + const prefix = "arrow_fixed_size_list_" + switch { + case strings.HasPrefix(name, prefix): + if n, err := strconv.Atoi(strings.TrimPrefix(name, prefix)); err == nil { + return arrow.FixedSizeListOf(int32(n), dt), nil + } + case name == "arrow_decimal256": + if inner := zed.InnerType(typ); inner == zed.TypeUint64 { + return &arrow.Decimal256Type{}, nil + } + case name == "arrow_large_list": + return arrow.LargeListOf(dt), nil + } + return arrow.ListOf(dt), nil + case *zed.TypeMap: + keyDT, err := w.newArrowDataType(typ.KeyType) + if err != nil { + return nil, err + } + valDT, err := w.newArrowDataType(typ.ValType) + if err != nil { + return nil, err + } + return arrow.MapOf(keyDT, valDT), nil + case *zed.TypeUnion: + if len(typ.Types) > math.MaxUint8 { + return nil, fmt.Errorf("%w: union with more than %d fields", ErrUnsupportedType, math.MaxUint8) + } + var fields []arrow.Field + var typeCodes []arrow.UnionTypeCode + var mapping []int + for _, typ := range typ.Types { + dt, err := w.newArrowDataType(typ) + if err != nil { + return nil, err + } + if j := slices.IndexFunc(fields, func(f arrow.Field) bool { return arrow.TypeEqual(f.Type, dt) }); j > -1 { + mapping = append(mapping, j) + continue + } + fields = append(fields, arrow.Field{ + Type: dt, + Nullable: true, + }) + typeCode := len(typeCodes) + typeCodes = append(typeCodes, arrow.UnionTypeCode(typeCode)) + mapping = append(mapping, typeCode) + } + w.unionTagMappings[typ] = mapping + return arrow.DenseUnionOf(fields, typeCodes), nil + case *zed.TypeEnum, *zed.TypeError: + return arrow.BinaryTypes.String, nil + default: + return nil, fmt.Errorf("%w: %s", ErrUnsupportedType, zson.FormatType(typ)) + } +} + +func fieldsEqual(a, b []zed.Column) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i].Name != b[i].Name || a[i].Type != b[i].Type { + return false + } + } + return true +} + +func (w *Writer) buildArrowValue(b array.Builder, typ zed.Type, bytes zcode.Bytes) { + if bytes == nil { + b.AppendNull() + return + } + var name string + if n, ok := typ.(*zed.TypeNamed); ok { + name = n.Name + typ = zed.TypeUnder(n.Type) + } + // Order here follows that of the arrow.Time constants. + switch b := b.(type) { + case *array.NullBuilder: + b.AppendNull() + case *array.BooleanBuilder: + b.Append(zed.DecodeBool(bytes)) + case *array.Uint8Builder: + b.Append(uint8(zed.DecodeUint(bytes))) + case *array.Int8Builder: + b.Append(int8(zed.DecodeInt(bytes))) + case *array.Uint16Builder: + b.Append(uint16(zed.DecodeUint(bytes))) + case *array.Int16Builder: + b.Append(int16(zed.DecodeInt(bytes))) + case *array.Uint32Builder: + b.Append(uint32(zed.DecodeUint(bytes))) + case *array.Int32Builder: + b.Append(int32(zed.DecodeInt(bytes))) + case *array.Uint64Builder: + b.Append(zed.DecodeUint(bytes)) + case *array.Int64Builder: + b.Append(zed.DecodeInt(bytes)) + case *array.Float16Builder: + b.Append(float16.New(zed.DecodeFloat32(bytes))) + case *array.Float32Builder: + b.Append(zed.DecodeFloat32(bytes)) + case *array.Float64Builder: + b.Append(zed.DecodeFloat64(bytes)) + case *array.StringBuilder: + switch typ := typ.(type) { + case *zed.TypeOfString: + b.Append(zed.DecodeString(bytes)) + case *zed.TypeOfIP: + b.Append(zed.DecodeIP(bytes).String()) + case *zed.TypeOfNet: + b.Append(zed.DecodeNet(bytes).String()) + case *zed.TypeOfType: + b.Append(zson.FormatTypeValue(bytes)) + case *zed.TypeEnum: + s, err := typ.Symbol(int(zed.DecodeUint(bytes))) + if err != nil { + panic(fmt.Sprintf("decoding %s with bytes %s: %s", zson.FormatType(typ), hex.EncodeToString(bytes), err)) + } + b.Append(s) + case *zed.TypeError: + b.Append(zson.MustFormatValue(zed.NewValue(typ, bytes))) + default: + panic(fmt.Sprintf("unexpected Zed type for StringBuilder: %s", zson.FormatType(typ))) + } + case *array.BinaryBuilder: + b.Append(zed.DecodeBytes(bytes)) + case *array.FixedSizeBinaryBuilder: + b.Append(zed.DecodeBytes(bytes)) + case *array.Date32Builder: + b.Append(arrow.Date32FromTime(zed.DecodeTime(bytes).Time())) + case *array.Date64Builder: + b.Append(arrow.Date64FromTime(zed.DecodeTime(bytes).Time())) + case *array.TimestampBuilder: + ts := zed.DecodeTime(bytes) + switch name { + case "arrow_timestamp_s": + ts /= nano.Ts(nano.Second) + case "arrow_timestamp_ms": + ts /= nano.Ts(nano.Millisecond) + case "arrow_timestamp_us": + ts /= nano.Ts(nano.Microsecond) + } + b.Append(arrow.Timestamp(ts)) + case *array.Time32Builder: + ts := zed.DecodeTime(bytes) + switch name { + case "arrow_time32_s": + ts /= nano.Ts(nano.Second) + case "arrow_time32_ms": + ts /= nano.Ts(nano.Millisecond) + default: + panic(fmt.Sprintf("unexpected Zed type name for Time32Builder: %s", zson.FormatType(typ))) + } + b.Append(arrow.Time32(ts)) + case *array.Time64Builder: + ts := zed.DecodeTime(bytes) + if name == "arrow_time64_us" { + ts /= nano.Ts(nano.Microsecond) + } + b.Append(arrow.Time64(ts)) + case *array.MonthIntervalBuilder: + b.Append(arrow.MonthInterval(zed.DecodeInt(bytes))) + case *array.DayTimeIntervalBuilder: + it := bytes.Iter() + b.Append(arrow.DayTimeInterval{ + Days: int32(zed.DecodeInt(it.Next())), + Milliseconds: int32(zed.DecodeInt(it.Next())), + }) + case *array.Decimal128Builder: + it := bytes.Iter() + high := zed.DecodeInt(it.Next()) + low := zed.DecodeUint(it.Next()) + b.Append(decimal128.New(high, low)) + case *array.Decimal256Builder: + it := bytes.Iter() + x4 := zed.DecodeUint(it.Next()) + x3 := zed.DecodeUint(it.Next()) + x2 := zed.DecodeUint(it.Next()) + x1 := zed.DecodeUint(it.Next()) + b.Append(decimal256.New(x1, x2, x3, x4)) + case *array.ListBuilder: + w.buildArrowListValue(b, typ, bytes) + case *array.StructBuilder: + b.Append(true) + it := bytes.Iter() + for i, field := range zed.TypeRecordOf(typ).Columns { + w.buildArrowValue(b.FieldBuilder(i), field.Type, it.Next()) + } + case *array.DenseUnionBuilder: + it := bytes.Iter() + tag := zed.DecodeInt(it.Next()) + typeCode := w.unionTagMappings[typ][tag] + b.Append(arrow.UnionTypeCode(typeCode)) + w.buildArrowValue(b.Child(typeCode), typ.(*zed.TypeUnion).Types[tag], it.Next()) + case *array.MapBuilder: + b.Append(true) + typ := zed.TypeUnder(typ).(*zed.TypeMap) + for it := bytes.Iter(); !it.Done(); { + w.buildArrowValue(b.KeyBuilder(), typ.KeyType, it.Next()) + w.buildArrowValue(b.ItemBuilder(), typ.ValType, it.Next()) + } + case *array.FixedSizeListBuilder: + w.buildArrowListValue(b, typ, bytes) + case *array.DurationBuilder: + d := zed.DecodeDuration(bytes) + switch name { + case "arrow_duration_s": + d /= nano.Second + case "arrow_duration_ms": + d /= nano.Millisecond + case "arrow_duration_us": + d /= nano.Microsecond + } + b.Append(arrow.Duration(d)) + case *array.LargeStringBuilder: + b.Append(zed.DecodeString(bytes)) + case *array.LargeListBuilder: + w.buildArrowListValue(b, typ, bytes) + case *array.MonthDayNanoIntervalBuilder: + it := bytes.Iter() + b.Append(arrow.MonthDayNanoInterval{ + Months: int32(zed.DecodeInt(it.Next())), + Days: int32(zed.DecodeInt(it.Next())), + Nanoseconds: zed.DecodeInt(it.Next()), + }) + default: + panic(fmt.Sprintf("unknown builder type %T", b)) + } +} + +func (w *Writer) buildArrowListValue(b array.ListLikeBuilder, typ zed.Type, bytes zcode.Bytes) { + b.Append(true) + for it := bytes.Iter(); !it.Done(); { + w.buildArrowValue(b.ValueBuilder(), zed.InnerType(typ), it.Next()) + } +} diff --git a/zio/arrowio/ztests/roundtrip.yaml b/zio/arrowio/ztests/roundtrip.yaml new file mode 100644 index 0000000000..3ac67eee3d --- /dev/null +++ b/zio/arrowio/ztests/roundtrip.yaml @@ -0,0 +1,81 @@ +script: | + zq -f arrows - | zq -i arrows -Z - + +inputs: + - name: stdin + data: &stdin | + { + null: null, + bool: true, + uint8: 8 (uint8), + int8: -8 (int8), + uint16: 16 (uint16), + int16: -16 (int16), + uint32: 32 (uint32), + int32: -32 (int32), + uint64: 64 (uint64), + int64: -64, + float16: 16. (arrow_float16=float32), + float32: 32. (float32), + float64: 64., + string: "", + binary: 0x00, + fixed_size_binary: 0x0102 (=arrow_fixed_size_binary_2), + date32: 2022-12-04T00:00:00Z (=arrow_date32), + date64: 2022-12-04T00:00:00Z (=arrow_date64), + timestamp_s: 2022-12-04T19:43:48Z (=arrow_timestamp_s), + timestamp_ms: 2022-12-04T19:43:48.123Z (=arrow_timestamp_ms), + timestamp_us: 2022-12-04T19:43:48.123456Z (=arrow_timestamp_us), + timestamp_ns: 2022-12-04T19:43:48.123456789Z, + time32_s: 1970-01-01T19:43:48Z (=arrow_time32_s), + time32_ms: 1970-01-01T19:43:48.123Z (=arrow_time32_ms), + time64_us: 1970-01-01T19:43:48.123456Z (=arrow_time64_us), + time64_ns: 1970-01-01T19:43:48.123456789Z (=arrow_time64_ns), + month_interval: -12 (arrow_month_interval=int32), + day_time_interval: { + days: 1 (int32), + milliseconds: 2 (uint32) + } (=arrow_day_time_interval), + decimal128: { + high: 1, + low: 2 (uint64) + } (=arrow_decimal128), + decimal256: [ + 1 (uint64), + 2 (uint64), + 3 (uint64), + 4 (uint64) + ] (=arrow_decimal256), + list: [ + 1 + ], + struct: { + a: 0 + }, + union: 1 ((int64,string)), + map: |{ + 1: "one" + }|, + fixed_size_list: [ + 1 + ] (=arrow_fixed_size_list_1), + duration_s: 1s (=arrow_duration_s), + duration_ms: 1ms (=arrow_duration_ms), + duration_us: 1us (=arrow_duration_us), + duration_ns: 1ns, + large_string: "" (=arrow_large_string), + large_binary: 0x00 (=arrow_large_binary), + large_list: [ + 1, + 2 + ] (=arrow_large_list), + month_day_nano_interval: { + month: 1 (int32), + day: 2 (int32), + nanoseconds: 3 + } (=arrow_month_day_nano_interval) + } + +outputs: + - name: stdout + data: *stdin diff --git a/zio/arrowio/ztests/writer-errors.yaml b/zio/arrowio/ztests/writer-errors.yaml new file mode 100644 index 0000000000..824fdd3c40 --- /dev/null +++ b/zio/arrowio/ztests/writer-errors.yaml @@ -0,0 +1,11 @@ +script: | + ! echo '{a:1} {b:2}' | zq -f arrows - + ! echo 1 | zq -f arrows - + ! echo {} | zq -f arrows - + +outputs: + - name: stderr + data: | + arrowio: encountered multiple types (consider 'fuse'): {a:int64} and {b:int64} + arrowio: not a record: 1 + arrowio: unsupported type: empty record