Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix "zed dev vcache copy" #4950

Merged
merged 1 commit into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions cmd/zed/dev/vcache/copy/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/brimdata/zed/pkg/charm"
"github.com/brimdata/zed/pkg/storage"
"github.com/brimdata/zed/runtime/vcache"
"github.com/brimdata/zed/zio"
"github.com/segmentio/ksuid"
)

Expand Down Expand Up @@ -66,11 +67,9 @@ func (c *Command) Run(args []string) error {
if err != nil {
return err
}
/*
if err := zio.Copy(writer, object.NewReader()); err != nil {
writer.Close()
return err
}
*/
if err := zio.Copy(writer, object.NewReader()); err != nil {
writer.Close()
return err
}
return writer.Close()
}
11 changes: 5 additions & 6 deletions runtime/vcache/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package vcache

import (
"fmt"
"io"

"github.com/brimdata/zed"
"github.com/brimdata/zed/pkg/field"
Expand All @@ -11,7 +10,7 @@ import (
meta "github.com/brimdata/zed/vng/vector" //XXX rename package
)

func loadArray(any *vector.Any, typ zed.Type, path field.Path, m *meta.Array, r io.ReaderAt) (*vector.Array, error) {
func (l *loader) loadArray(any *vector.Any, typ zed.Type, path field.Path, m *meta.Array) (*vector.Array, error) {
if *any == nil {
var innerType zed.Type
switch typ := typ.(type) {
Expand All @@ -22,15 +21,15 @@ func loadArray(any *vector.Any, typ zed.Type, path field.Path, m *meta.Array, r
default:
return nil, fmt.Errorf("internal error: vcache.loadArray encountered bad type: %s", typ)
}
lengths, err := vng.ReadIntVector(m.Lengths, r)
lengths, err := vng.ReadIntVector(m.Lengths, l.r)
if err != nil {
return nil, err
}
values, err := loadVector(nil, innerType, path, m.Values, r)
if err != nil {
var values vector.Any
if _, err := l.loadVector(&values, innerType, path, m.Values); err != nil {
return nil, err
}
*any = vector.NewArray(typ.(*zed.TypeArray), lengths, values)
*any = vector.NewArray(typ, lengths, values)
}
//XXX always return the array as the vector engine needs to know how to handle
// manipulating the array no matter what it contains
Expand Down
14 changes: 9 additions & 5 deletions runtime/vcache/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,34 @@ package vcache

import (
"fmt"
"io"

"github.com/brimdata/zed"
"github.com/brimdata/zed/pkg/field"
"github.com/brimdata/zed/vector"
"github.com/brimdata/zed/vng"
meta "github.com/brimdata/zed/vng/vector"
)

func loadMap(any *vector.Any, typ zed.Type, path field.Path, m *meta.Map, r io.ReaderAt) (*vector.Map, error) {
func (l *loader) loadMap(any *vector.Any, typ zed.Type, path field.Path, m *meta.Map) (*vector.Map, error) {
if *any == nil {
mapType, ok := typ.(*zed.TypeMap)
if !ok {
return nil, fmt.Errorf("internal error: vcache.loadMap encountered bad type: %s", typ)
}
lengths, err := vng.ReadIntVector(m.Lengths, l.r)
if err != nil {
return nil, err
}
var keys, values vector.Any
_, err := loadVector(&keys, mapType.KeyType, path, m.Keys, r)
_, err = l.loadVector(&keys, mapType.KeyType, path, m.Keys)
if err != nil {
return nil, err
}
_, err = loadVector(&values, mapType.ValType, path, m.Values, r)
_, err = l.loadVector(&values, mapType.ValType, path, m.Values)
if err != nil {
return nil, err
}
*any = vector.NewMap(mapType, keys, values)
*any = vector.NewMap(mapType, lengths, keys, values)
}
return (*any).(*vector.Map), nil
}
26 changes: 15 additions & 11 deletions runtime/vcache/nulls.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,35 @@ import (
meta "github.com/brimdata/zed/vng/vector"
)

func loadNulls(any *vector.Any, typ zed.Type, path field.Path, m *meta.Nulls, r io.ReaderAt) (vector.Any, error) {
func (l *loader) loadNulls(any *vector.Any, typ zed.Type, path field.Path, m *meta.Nulls) (vector.Any, error) {
// The runlengths are typically small so we load them with the metadata
// and don't bother waiting for a reference.
runlens := meta.NewInt64Reader(m.Runs, r) //XXX 32-bit reader?
var off, nulls uint32
null := true
//XXX finish this loop... need to remove slots covered by nulls and subtract
// cumulative number of nulls for each surviving value slot.
runlens := meta.NewInt64Reader(m.Runs, l.r) //XXX 32-bit reader?
var null bool
var off int
var slots []uint32
// In zed, nulls are generally bad and not really needed because we don't
// need super-wide uber schemas with lots of nulls.
for {
//XXX need nullslots array to build vector.Nullmask and need a way to pass down Nullmask XXX
run, err := runlens.Read()
if err != nil {
if err == io.EOF {
break
}
return nil, err
}
off += uint32(run)
if null {
nulls += uint32(run)
for i := 0; int64(i) < run; i++ {
slots = append(slots, uint32(off+i))
}
}
off += int(run)
null = !null
}
//newSlots := slots //XXX need to create this above
return loadVector(any, typ, path, m.Values, r)
var values vector.Any
if _, err := l.loadVector(&values, typ, path, m.Values); err != nil {
return nil, err
}
*any = vector.NewNulls(slots, off, values)
return *any, nil
}
10 changes: 9 additions & 1 deletion runtime/vcache/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,15 @@ func (o *Object) Len() int {
// types in the hiearchy). Load returns a Group for each type and the Group
// may contain multiple vectors.
func (o *Object) Load(typeKey uint32, path field.Path) (vector.Any, error) {
l := loader{o.local, o.reader}
o.mu[typeKey].Lock()
defer o.mu[typeKey].Unlock()
return loadVector(&o.vectors[typeKey], o.typeDict[typeKey], path, o.metas[typeKey], o.reader)
return l.loadVector(&o.vectors[typeKey], o.typeDict[typeKey], path, o.metas[typeKey])
}

func (o *Object) NewReader() *Reader {
return &Reader{
object: o,
builders: make([]vector.Builder, len(o.vectors)),
}
}
131 changes: 49 additions & 82 deletions runtime/vcache/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ package vcache

import (
"fmt"
"io"
"net/netip"

"github.com/brimdata/zed"
"github.com/brimdata/zed/vector"
meta "github.com/brimdata/zed/vng/vector"
"github.com/brimdata/zed/zcode"
)

func loadPrimitive(typ zed.Type, m *meta.Primitive, r io.ReaderAt) (vector.Any, error) {
func (l *loader) loadPrimitive(typ zed.Type, m *meta.Primitive) (vector.Any, error) {
// The VNG primitive columns are stored as one big
// list of Zed values. So we can just read the data in
// all at once, compute the byte offsets of each value
Expand All @@ -22,7 +22,7 @@ func loadPrimitive(typ zed.Type, m *meta.Primitive, r io.ReaderAt) (vector.Any,
bytes := make([]byte, n)
var off int
for _, segment := range m.Segmap {
if err := segment.Read(r, bytes[off:]); err != nil {
if err := segment.Read(l.r, bytes[off:]); err != nil {
return nil, err
}
off += int(segment.MemLength)
Expand All @@ -34,103 +34,70 @@ func loadPrimitive(typ zed.Type, m *meta.Primitive, r io.ReaderAt) (vector.Any,
}
bytes = b
}
it := zcode.Iter(bytes)
switch typ := typ.(type) {
case *zed.TypeOfUint8, *zed.TypeOfUint16, *zed.TypeOfUint32, *zed.TypeOfUint64:
//XXX put valcnt in vng meta and use vector allocator
var vals []uint64
var nullslots []uint32
it := zcode.Bytes(bytes).Iter()
var values []uint64
for !it.Done() {
val := it.Next()
if val == nil {
nullslots = append(nullslots, uint32(len(vals)))
vals = append(vals, 0)
} else {
vals = append(vals, zed.DecodeUint(val))
}
values = append(values, zed.DecodeUint(it.Next()))
}
return vector.NewUint(typ, vals, vector.NewNullmask(nullslots, len(vals))), nil
return vector.NewUint(typ, values), nil
case *zed.TypeOfInt8, *zed.TypeOfInt16, *zed.TypeOfInt32, *zed.TypeOfInt64, *zed.TypeOfDuration, *zed.TypeOfTime:
//XXX put valcnt in vng meta and use vector allocator
var vals []int64
var nullslots []uint32
it := zcode.Bytes(bytes).Iter()
var values []int64
for !it.Done() {
val := it.Next()
if val == nil {
nullslots = append(nullslots, uint32(len(vals)))
vals = append(vals, 0)
} else {
vals = append(vals, zed.DecodeInt(val))
}
values = append(values, zed.DecodeInt(it.Next()))
}
return vector.NewInt(typ, vals, vector.NewNullmask(nullslots, len(vals))), nil
case *zed.TypeOfFloat16:
return nil, fmt.Errorf("vcache.Primitive.Load TBD for %T", typ)
case *zed.TypeOfFloat32:
return nil, fmt.Errorf("vcache.Primitive.Load TBD for %T", typ)
case *zed.TypeOfFloat64:
return nil, fmt.Errorf("vcache.Primitive.Load TBD for %T", typ)
return vector.NewInt(typ, values), nil
case *zed.TypeOfFloat16, *zed.TypeOfFloat32, *zed.TypeOfFloat64:
var values []float64
for !it.Done() {
values = append(values, zed.DecodeFloat(it.Next()))
}
return vector.NewFloat(typ, values), nil
case *zed.TypeOfBool:
var vals []bool
var nullslots []uint32
it := zcode.Bytes(bytes).Iter()
var values []bool
for !it.Done() {
val := it.Next()
if val == nil {
nullslots = append(nullslots, uint32(len(vals)))
vals = append(vals, false)
} else {
vals = append(vals, zed.DecodeBool(val))
}
values = append(values, zed.DecodeBool(it.Next()))
}
return vector.NewBool(typ, vals, vector.NewNullmask(nullslots, len(vals))), nil
return vector.NewBool(typ, values), nil
case *zed.TypeOfBytes:
return nil, fmt.Errorf("vcache.Primitive.Load TBD for %T", typ)
var values [][]byte
for !it.Done() {
values = append(values, zed.DecodeBytes(it.Next()))
}
return vector.NewBytes(typ, values), nil
case *zed.TypeOfString:
var vals []string
var nullslots []uint32
it := zcode.Bytes(bytes).Iter()
var values []string
for !it.Done() {
val := it.Next()
if val == nil {
nullslots = append(nullslots, uint32(len(vals)))
} else {
vals = append(vals, zed.DecodeString(val))
}
values = append(values, zed.DecodeString(it.Next()))
}
return vector.NewString(typ, vals, vector.NewNullmask(nullslots, len(vals))), nil
return vector.NewString(typ, values), nil
case *zed.TypeOfIP:
return nil, fmt.Errorf("vcache.Primitive.Load TBD for %T", typ)
var values []netip.Addr
for !it.Done() {
values = append(values, zed.DecodeIP(it.Next()))
}
return vector.NewIP(typ, values), nil
case *zed.TypeOfNet:
return nil, fmt.Errorf("vcache.Primitive.Load TBD for %T", typ)
case *zed.TypeOfNull:
return nil, fmt.Errorf("vcache.Primitive.Load TBD for %T", typ)
var values []netip.Prefix
for !it.Done() {
values = append(values, zed.DecodeNet(it.Next()))
}
return vector.NewNet(typ, values), nil
case *zed.TypeOfType:
return nil, fmt.Errorf("vcache.Primitive.Load TBD for %T", typ)
var values []zed.Type
for !it.Done() {
t, err := l.zctx.LookupByValue(it.Next())
if err != nil {
return nil, err
}
values = append(values, t)
}
return vector.NewType(typ, values), nil
case *zed.TypeOfNull:
return vector.NewConst(zed.Null, 0), nil
}
return nil, nil
/*
XXX
if dict := p.meta.Dict; dict != nil {
bytes := p.bytes
return func(b *zcode.Builder) error {
pos := bytes[0]
bytes = bytes[1:]
b.Append(dict[pos].Value.Bytes())
return nil
}, nil
}
it := zcode.Iter(p.bytes)
return func(b *zcode.Builder) error {
b.Append(it.Next())
return nil
}, nil

/* XXX

return nil, fmt.Errorf("internal error: vcache.Primitive.Load uknown type %T", typ)
*/
return nil, fmt.Errorf("internal error: vcache.loadPrimitive got unknown type %#v", typ)
}

type Const struct {
Expand Down
Loading