Skip to content

Commit

Permalink
Fix "zed dev vcache copy" (#4950)
Browse files Browse the repository at this point in the history
Do that by plugging some holes left by #4925.

* In runtime/vcache/ztests/, re-enable skipped "zed dev vcache copy"
  tests.

* In cmd/zed/dev/vcache/copy.Command.Run, resurrect the call to
  zio.Copy.

* In package runtime/vcache:

  * Resurrect the Reader type and update its Read method.

  * Resurrect Object.NewReader.

  * Add loader type to hold the *zed.Context needed when loading
    a vector of type values.

  * Convert existing loadXXX functions to loader methods.

  * In loadArray, fix bad nil argument to loadVector.

  * In loadMap, load lengths vector.

  * In loadNulls, create slots slice and load values vector.

  * In loadPrimitive, remove null handling (vector.Nulls now does
    that) and add support for missing Zed types (float16, float32,
    float64, bytes, ip, net, and type).

  * In loadRecord, load all fields when no specific path is requested.

  * In loadUnion, load tags vector.

  * In loadVector, assign to *any when loading a constant vector.

* In package vector:

  * Add missing Byte, Float, IP, Net, and Type types.

  * Add missing length field to Map type.

  * Add missing tags field to Union type.

  * Implement NewBuilder method for Array, Map, and Union types.

  * Handle nulls for all vector types by replacing the Nullmask
    type with Nulls, which wraps a vector.Any.
  • Loading branch information
nwt authored Dec 22, 2023
1 parent 6be9a88 commit ec5165f
Show file tree
Hide file tree
Showing 31 changed files with 556 additions and 226 deletions.
11 changes: 5 additions & 6 deletions cmd/zed/dev/vcache/copy/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/brimdata/zed/pkg/charm"
"github.com/brimdata/zed/pkg/storage"
"github.com/brimdata/zed/runtime/vcache"
"github.com/brimdata/zed/zio"
"github.com/segmentio/ksuid"
)

Expand Down Expand Up @@ -66,11 +67,9 @@ func (c *Command) Run(args []string) error {
if err != nil {
return err
}
/*
if err := zio.Copy(writer, object.NewReader()); err != nil {
writer.Close()
return err
}
*/
if err := zio.Copy(writer, object.NewReader()); err != nil {
writer.Close()
return err
}
return writer.Close()
}
11 changes: 5 additions & 6 deletions runtime/vcache/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package vcache

import (
"fmt"
"io"

"github.com/brimdata/zed"
"github.com/brimdata/zed/pkg/field"
Expand All @@ -11,7 +10,7 @@ import (
meta "github.com/brimdata/zed/vng/vector" //XXX rename package
)

func loadArray(any *vector.Any, typ zed.Type, path field.Path, m *meta.Array, r io.ReaderAt) (*vector.Array, error) {
func (l *loader) loadArray(any *vector.Any, typ zed.Type, path field.Path, m *meta.Array) (*vector.Array, error) {
if *any == nil {
var innerType zed.Type
switch typ := typ.(type) {
Expand All @@ -22,15 +21,15 @@ func loadArray(any *vector.Any, typ zed.Type, path field.Path, m *meta.Array, r
default:
return nil, fmt.Errorf("internal error: vcache.loadArray encountered bad type: %s", typ)
}
lengths, err := vng.ReadIntVector(m.Lengths, r)
lengths, err := vng.ReadIntVector(m.Lengths, l.r)
if err != nil {
return nil, err
}
values, err := loadVector(nil, innerType, path, m.Values, r)
if err != nil {
var values vector.Any
if _, err := l.loadVector(&values, innerType, path, m.Values); err != nil {
return nil, err
}
*any = vector.NewArray(typ.(*zed.TypeArray), lengths, values)
*any = vector.NewArray(typ, lengths, values)
}
//XXX always return the array as the vector engine needs to know how to handle
// manipulating the array no matter what it contains
Expand Down
14 changes: 9 additions & 5 deletions runtime/vcache/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,34 @@ package vcache

import (
"fmt"
"io"

"github.com/brimdata/zed"
"github.com/brimdata/zed/pkg/field"
"github.com/brimdata/zed/vector"
"github.com/brimdata/zed/vng"
meta "github.com/brimdata/zed/vng/vector"
)

func loadMap(any *vector.Any, typ zed.Type, path field.Path, m *meta.Map, r io.ReaderAt) (*vector.Map, error) {
func (l *loader) loadMap(any *vector.Any, typ zed.Type, path field.Path, m *meta.Map) (*vector.Map, error) {
if *any == nil {
mapType, ok := typ.(*zed.TypeMap)
if !ok {
return nil, fmt.Errorf("internal error: vcache.loadMap encountered bad type: %s", typ)
}
lengths, err := vng.ReadIntVector(m.Lengths, l.r)
if err != nil {
return nil, err
}
var keys, values vector.Any
_, err := loadVector(&keys, mapType.KeyType, path, m.Keys, r)
_, err = l.loadVector(&keys, mapType.KeyType, path, m.Keys)
if err != nil {
return nil, err
}
_, err = loadVector(&values, mapType.ValType, path, m.Values, r)
_, err = l.loadVector(&values, mapType.ValType, path, m.Values)
if err != nil {
return nil, err
}
*any = vector.NewMap(mapType, keys, values)
*any = vector.NewMap(mapType, lengths, keys, values)
}
return (*any).(*vector.Map), nil
}
26 changes: 15 additions & 11 deletions runtime/vcache/nulls.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,35 @@ import (
meta "github.com/brimdata/zed/vng/vector"
)

func loadNulls(any *vector.Any, typ zed.Type, path field.Path, m *meta.Nulls, r io.ReaderAt) (vector.Any, error) {
func (l *loader) loadNulls(any *vector.Any, typ zed.Type, path field.Path, m *meta.Nulls) (vector.Any, error) {
// The runlengths are typically small so we load them with the metadata
// and don't bother waiting for a reference.
runlens := meta.NewInt64Reader(m.Runs, r) //XXX 32-bit reader?
var off, nulls uint32
null := true
//XXX finish this loop... need to remove slots covered by nulls and subtract
// cumulative number of nulls for each surviving value slot.
runlens := meta.NewInt64Reader(m.Runs, l.r) //XXX 32-bit reader?
var null bool
var off int
var slots []uint32
// In zed, nulls are generally bad and not really needed because we don't
// need super-wide uber schemas with lots of nulls.
for {
//XXX need nullslots array to build vector.Nullmask and need a way to pass down Nullmask XXX
run, err := runlens.Read()
if err != nil {
if err == io.EOF {
break
}
return nil, err
}
off += uint32(run)
if null {
nulls += uint32(run)
for i := 0; int64(i) < run; i++ {
slots = append(slots, uint32(off+i))
}
}
off += int(run)
null = !null
}
//newSlots := slots //XXX need to create this above
return loadVector(any, typ, path, m.Values, r)
var values vector.Any
if _, err := l.loadVector(&values, typ, path, m.Values); err != nil {
return nil, err
}
*any = vector.NewNulls(slots, off, values)
return *any, nil
}
10 changes: 9 additions & 1 deletion runtime/vcache/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,15 @@ func (o *Object) Len() int {
// types in the hiearchy). Load returns a Group for each type and the Group
// may contain multiple vectors.
func (o *Object) Load(typeKey uint32, path field.Path) (vector.Any, error) {
l := loader{o.local, o.reader}
o.mu[typeKey].Lock()
defer o.mu[typeKey].Unlock()
return loadVector(&o.vectors[typeKey], o.typeDict[typeKey], path, o.metas[typeKey], o.reader)
return l.loadVector(&o.vectors[typeKey], o.typeDict[typeKey], path, o.metas[typeKey])
}

func (o *Object) NewReader() *Reader {
return &Reader{
object: o,
builders: make([]vector.Builder, len(o.vectors)),
}
}
131 changes: 49 additions & 82 deletions runtime/vcache/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ package vcache

import (
"fmt"
"io"
"net/netip"

"github.com/brimdata/zed"
"github.com/brimdata/zed/vector"
meta "github.com/brimdata/zed/vng/vector"
"github.com/brimdata/zed/zcode"
)

func loadPrimitive(typ zed.Type, m *meta.Primitive, r io.ReaderAt) (vector.Any, error) {
func (l *loader) loadPrimitive(typ zed.Type, m *meta.Primitive) (vector.Any, error) {
// The VNG primitive columns are stored as one big
// list of Zed values. So we can just read the data in
// all at once, compute the byte offsets of each value
Expand All @@ -22,7 +22,7 @@ func loadPrimitive(typ zed.Type, m *meta.Primitive, r io.ReaderAt) (vector.Any,
bytes := make([]byte, n)
var off int
for _, segment := range m.Segmap {
if err := segment.Read(r, bytes[off:]); err != nil {
if err := segment.Read(l.r, bytes[off:]); err != nil {
return nil, err
}
off += int(segment.MemLength)
Expand All @@ -34,103 +34,70 @@ func loadPrimitive(typ zed.Type, m *meta.Primitive, r io.ReaderAt) (vector.Any,
}
bytes = b
}
it := zcode.Iter(bytes)
switch typ := typ.(type) {
case *zed.TypeOfUint8, *zed.TypeOfUint16, *zed.TypeOfUint32, *zed.TypeOfUint64:
//XXX put valcnt in vng meta and use vector allocator
var vals []uint64
var nullslots []uint32
it := zcode.Bytes(bytes).Iter()
var values []uint64
for !it.Done() {
val := it.Next()
if val == nil {
nullslots = append(nullslots, uint32(len(vals)))
vals = append(vals, 0)
} else {
vals = append(vals, zed.DecodeUint(val))
}
values = append(values, zed.DecodeUint(it.Next()))
}
return vector.NewUint(typ, vals, vector.NewNullmask(nullslots, len(vals))), nil
return vector.NewUint(typ, values), nil
case *zed.TypeOfInt8, *zed.TypeOfInt16, *zed.TypeOfInt32, *zed.TypeOfInt64, *zed.TypeOfDuration, *zed.TypeOfTime:
//XXX put valcnt in vng meta and use vector allocator
var vals []int64
var nullslots []uint32
it := zcode.Bytes(bytes).Iter()
var values []int64
for !it.Done() {
val := it.Next()
if val == nil {
nullslots = append(nullslots, uint32(len(vals)))
vals = append(vals, 0)
} else {
vals = append(vals, zed.DecodeInt(val))
}
values = append(values, zed.DecodeInt(it.Next()))
}
return vector.NewInt(typ, vals, vector.NewNullmask(nullslots, len(vals))), nil
case *zed.TypeOfFloat16:
return nil, fmt.Errorf("vcache.Primitive.Load TBD for %T", typ)
case *zed.TypeOfFloat32:
return nil, fmt.Errorf("vcache.Primitive.Load TBD for %T", typ)
case *zed.TypeOfFloat64:
return nil, fmt.Errorf("vcache.Primitive.Load TBD for %T", typ)
return vector.NewInt(typ, values), nil
case *zed.TypeOfFloat16, *zed.TypeOfFloat32, *zed.TypeOfFloat64:
var values []float64
for !it.Done() {
values = append(values, zed.DecodeFloat(it.Next()))
}
return vector.NewFloat(typ, values), nil
case *zed.TypeOfBool:
var vals []bool
var nullslots []uint32
it := zcode.Bytes(bytes).Iter()
var values []bool
for !it.Done() {
val := it.Next()
if val == nil {
nullslots = append(nullslots, uint32(len(vals)))
vals = append(vals, false)
} else {
vals = append(vals, zed.DecodeBool(val))
}
values = append(values, zed.DecodeBool(it.Next()))
}
return vector.NewBool(typ, vals, vector.NewNullmask(nullslots, len(vals))), nil
return vector.NewBool(typ, values), nil
case *zed.TypeOfBytes:
return nil, fmt.Errorf("vcache.Primitive.Load TBD for %T", typ)
var values [][]byte
for !it.Done() {
values = append(values, zed.DecodeBytes(it.Next()))
}
return vector.NewBytes(typ, values), nil
case *zed.TypeOfString:
var vals []string
var nullslots []uint32
it := zcode.Bytes(bytes).Iter()
var values []string
for !it.Done() {
val := it.Next()
if val == nil {
nullslots = append(nullslots, uint32(len(vals)))
} else {
vals = append(vals, zed.DecodeString(val))
}
values = append(values, zed.DecodeString(it.Next()))
}
return vector.NewString(typ, vals, vector.NewNullmask(nullslots, len(vals))), nil
return vector.NewString(typ, values), nil
case *zed.TypeOfIP:
return nil, fmt.Errorf("vcache.Primitive.Load TBD for %T", typ)
var values []netip.Addr
for !it.Done() {
values = append(values, zed.DecodeIP(it.Next()))
}
return vector.NewIP(typ, values), nil
case *zed.TypeOfNet:
return nil, fmt.Errorf("vcache.Primitive.Load TBD for %T", typ)
case *zed.TypeOfNull:
return nil, fmt.Errorf("vcache.Primitive.Load TBD for %T", typ)
var values []netip.Prefix
for !it.Done() {
values = append(values, zed.DecodeNet(it.Next()))
}
return vector.NewNet(typ, values), nil
case *zed.TypeOfType:
return nil, fmt.Errorf("vcache.Primitive.Load TBD for %T", typ)
var values []zed.Type
for !it.Done() {
t, err := l.zctx.LookupByValue(it.Next())
if err != nil {
return nil, err
}
values = append(values, t)
}
return vector.NewType(typ, values), nil
case *zed.TypeOfNull:
return vector.NewConst(zed.Null, 0), nil
}
return nil, nil
/*
XXX
if dict := p.meta.Dict; dict != nil {
bytes := p.bytes
return func(b *zcode.Builder) error {
pos := bytes[0]
bytes = bytes[1:]
b.Append(dict[pos].Value.Bytes())
return nil
}, nil
}
it := zcode.Iter(p.bytes)
return func(b *zcode.Builder) error {
b.Append(it.Next())
return nil
}, nil
/* XXX
return nil, fmt.Errorf("internal error: vcache.Primitive.Load uknown type %T", typ)
*/
return nil, fmt.Errorf("internal error: vcache.loadPrimitive got unknown type %#v", typ)
}

type Const struct {
Expand Down
Loading

0 comments on commit ec5165f

Please sign in to comment.