Skip to content

Commit

Permalink
557 import tool usability (#800)
Browse files Browse the repository at this point in the history
* parquet reader: interface change, fix crash on missing fields

* skip optional, empty fields

* import tool usability+tests

* fix test

* order

* order

* CR fixes + simplifications: remove pointers from InventoryObject, change time field to time.Time

* CR fix: run test once for each format

* CR fix: add default to switch

* add sub-second part of modification time + tests
  • Loading branch information
johnnyaug authored Oct 12, 2020
1 parent 06132ba commit b78e5c6
Show file tree
Hide file tree
Showing 12 changed files with 520 additions and 205 deletions.
2 changes: 1 addition & 1 deletion block/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type InventoryObject struct {
Bucket string
Key string
Size int64
LastModified time.Time
LastModified *time.Time
Checksum string
PhysicalAddress string
}
Expand Down
20 changes: 6 additions & 14 deletions block/s3/inventory_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type InventoryIterator struct {
*Inventory
err error
val *block.InventoryObject
buffer []s3inventory.InventoryObject
buffer []*s3inventory.InventoryObject
inventoryFileIndex int
valIndexInBuffer int
inventoryFileProgress *cmdutils.Progress
Expand Down Expand Up @@ -94,8 +94,7 @@ func (it *InventoryIterator) fillBuffer() bool {
it.logger.Errorf("failed to close manifest file reader. file=%s, err=%w", it.Manifest.Files[it.inventoryFileIndex].Key, err)
}
}()
it.buffer = make([]s3inventory.InventoryObject, rdr.GetNumRows())
err = rdr.Read(&it.buffer)
it.buffer, err = rdr.Read(int(rdr.GetNumRows()))
if err != nil {
it.err = err
return false
Expand All @@ -106,23 +105,16 @@ func (it *InventoryIterator) fillBuffer() bool {
func (it *InventoryIterator) nextFromBuffer() *block.InventoryObject {
for i := it.valIndexInBuffer + 1; i < len(it.buffer); i++ {
obj := it.buffer[i]
if (obj.IsLatest != nil && !*obj.IsLatest) ||
(obj.IsDeleteMarker != nil && *obj.IsDeleteMarker) {
if !obj.IsLatest || obj.IsDeleteMarker {
continue
}
res := block.InventoryObject{
Bucket: obj.Bucket,
Key: obj.Key,
PhysicalAddress: obj.GetPhysicalAddress(),
}
if obj.Size != nil {
res.Size = *obj.Size
}
if obj.LastModifiedMillis != nil {
res.LastModified = time.Unix(*obj.LastModifiedMillis/int64(time.Second/time.Millisecond), 0)
}
if obj.Checksum != nil {
res.Checksum = *obj.Checksum
Size: obj.Size,
LastModified: obj.LastModified,
Checksum: obj.Checksum,
}
it.valIndexInBuffer = i
return &res
Expand Down
25 changes: 11 additions & 14 deletions block/s3/inventory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ func rows(keys []string, lastModified map[string]time.Time) []*s3inventory.Inven
if key != "" {
res[i] = new(s3inventory.InventoryObject)
res[i].Key = key
res[i].IsLatest = swag.Bool(!strings.Contains(key, "_expired"))
res[i].IsDeleteMarker = swag.Bool(strings.Contains(key, "_del"))
res[i].IsLatest = !strings.Contains(key, "_expired")
res[i].IsDeleteMarker = strings.Contains(key, "_del")
if lastModified != nil {
res[i].LastModifiedMillis = swag.Int64(lastModified[key].Unix() * 1000)
res[i].LastModified = swag.Time(lastModified[key])
}
}
}
Expand Down Expand Up @@ -196,9 +196,8 @@ func TestIterator(t *testing.T) {
if obj.Key != test.ExpectedObjects[i] {
t.Fatalf("at index %d: expected=%s, got=%s", i, test.ExpectedObjects[i], obj.Key)
}
expectedLastModified := lastModified[obj.Key].Truncate(time.Second)
if obj.LastModified != expectedLastModified {
t.Fatalf("last modified for object in index %d different than expected. expected=%v, got=%v", i, expectedLastModified, obj.LastModified)
if *obj.LastModified != lastModified[obj.Key] {
t.Fatalf("last modified for object in index %d different than expected. expected=%v, got=%v", i, lastModified[obj.Key], obj.LastModified)
}
}
}
Expand Down Expand Up @@ -246,18 +245,16 @@ func (m *mockInventoryFileReader) Close() error {
return nil
}

func (m *mockInventoryFileReader) Read(dstInterface interface{}) error {
res := make([]s3inventory.InventoryObject, 0, len(m.rows))
dst := dstInterface.(*[]s3inventory.InventoryObject)
for i := m.nextIdx; i < len(m.rows) && i < m.nextIdx+len(*dst); i++ {
func (m *mockInventoryFileReader) Read(n int) ([]*s3inventory.InventoryObject, error) {
res := make([]*s3inventory.InventoryObject, 0, len(m.rows))
for i := m.nextIdx; i < len(m.rows) && i < m.nextIdx+n; i++ {
if m.rows[i] == nil {
return ErrReadFile // for test - simulate file with error
return nil, ErrReadFile // for test - simulate file with error
}
res = append(res, *m.rows[i])
res = append(res, m.rows[i])
}
m.nextIdx = m.nextIdx + len(res)
*dst = res
return nil
return res, nil
}

func (m *mockInventoryFileReader) GetNumRows() int64 {
Expand Down
64 changes: 25 additions & 39 deletions cloud/aws/s3inventory/orc_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package s3inventory

import (
"context"
"reflect"
"time"

"github.com/go-openapi/swag"
Expand Down Expand Up @@ -30,7 +29,6 @@ type OrcSelect struct {
}

func getOrcSelect(typeDescription *orc.TypeDescription) *OrcSelect {
relevantFields := []string{"bucket", "key", "size", "last_modified_date", "e_tag", "is_delete_marker", "is_latest"}
res := &OrcSelect{
SelectFields: nil,
IndexInFile: make(map[string]int),
Expand All @@ -40,7 +38,7 @@ func getOrcSelect(typeDescription *orc.TypeDescription) *OrcSelect {
res.IndexInFile[field] = i
}
j := 0
for _, field := range relevantFields {
for _, field := range inventoryFields {
if _, ok := res.IndexInFile[field]; ok {
res.SelectFields = append(res.SelectFields, field)
res.IndexInSelect[field] = j
Expand All @@ -50,45 +48,34 @@ func getOrcSelect(typeDescription *orc.TypeDescription) *OrcSelect {
return res
}

func (r *OrcInventoryFileReader) inventoryObjectFromRow(rowData []interface{}) InventoryObject {
var size *int64
if sizeIdx, ok := r.orcSelect.IndexInSelect["size"]; ok && rowData[sizeIdx] != nil {
size = swag.Int64(rowData[sizeIdx].(int64))
func (r *OrcInventoryFileReader) inventoryObjectFromRow(rowData []interface{}) *InventoryObject {
obj := NewInventoryObject()
obj.Bucket = rowData[r.orcSelect.IndexInSelect[bucketFieldName]].(string)
obj.Key = rowData[r.orcSelect.IndexInSelect[keyFieldName]].(string)
if sizeIdx, ok := r.orcSelect.IndexInSelect[sizeFieldName]; ok && rowData[sizeIdx] != nil {
obj.Size = rowData[sizeIdx].(int64)
}
var lastModifiedMillis *int64
if lastModifiedIdx, ok := r.orcSelect.IndexInSelect["last_modified_date"]; ok && rowData[lastModifiedIdx] != nil {
lastModifiedMillis = swag.Int64(rowData[lastModifiedIdx].(time.Time).UnixNano() / int64(time.Millisecond))
if lastModifiedIdx, ok := r.orcSelect.IndexInSelect[lastModifiedDateFieldName]; ok && rowData[lastModifiedIdx] != nil {
obj.LastModified = swag.Time(rowData[lastModifiedIdx].(time.Time))
}
var eTag *string
if eTagIdx, ok := r.orcSelect.IndexInSelect["e_tag"]; ok && rowData[eTagIdx] != nil {
eTag = swag.String(rowData[eTagIdx].(string))
if eTagIdx, ok := r.orcSelect.IndexInSelect[eTagFieldName]; ok && rowData[eTagIdx] != nil {
obj.Checksum = rowData[eTagIdx].(string)
}
var isLatest *bool
if isLatestIdx, ok := r.orcSelect.IndexInSelect["is_latest"]; ok && rowData[isLatestIdx] != nil {
isLatest = swag.Bool(rowData[isLatestIdx].(bool))
if isLatestIdx, ok := r.orcSelect.IndexInSelect[isLatestFieldName]; ok && rowData[isLatestIdx] != nil {
obj.IsLatest = rowData[isLatestIdx].(bool)
}
var isDeleteMarker *bool
if isDeleteMarkerIdx, ok := r.orcSelect.IndexInSelect["is_delete_marker"]; ok && rowData[isDeleteMarkerIdx] != nil {
isDeleteMarker = swag.Bool(rowData[isDeleteMarkerIdx].(bool))
}
return InventoryObject{
Bucket: rowData[r.orcSelect.IndexInSelect["bucket"]].(string),
Key: rowData[r.orcSelect.IndexInSelect["key"]].(string),
Size: size,
LastModifiedMillis: lastModifiedMillis,
Checksum: eTag,
IsLatest: isLatest,
IsDeleteMarker: isDeleteMarker,
if isDeleteMarkerIdx, ok := r.orcSelect.IndexInSelect[isDeleteMarkerFieldName]; ok && rowData[isDeleteMarkerIdx] != nil {
obj.IsDeleteMarker = rowData[isDeleteMarkerIdx].(bool)
}
return obj
}

func (r *OrcInventoryFileReader) Read(dstInterface interface{}) error {
num := reflect.ValueOf(dstInterface).Elem().Len()
res := make([]InventoryObject, 0, num)
func (r *OrcInventoryFileReader) Read(n int) ([]*InventoryObject, error) {
res := make([]*InventoryObject, 0, n)
for {
select {
case <-r.ctx.Done():
return r.ctx.Err()
return nil, r.ctx.Err()
default:
}
if !r.cursor.Next() {
Expand All @@ -99,14 +86,13 @@ func (r *OrcInventoryFileReader) Read(dstInterface interface{}) error {
break
}
}
res = append(res, r.inventoryObjectFromRow(r.cursor.Row()))
if len(res) == num {
obj := r.inventoryObjectFromRow(r.cursor.Row())
res = append(res, obj)
if len(res) == n {
break
}
}

reflect.ValueOf(dstInterface).Elem().Set(reflect.ValueOf(res))
return nil
return res, nil
}

func (r *OrcInventoryFileReader) GetNumRows() int64 {
Expand All @@ -128,9 +114,9 @@ func (r *OrcInventoryFileReader) Close() error {
}

func (r *OrcInventoryFileReader) FirstObjectKey() string {
return *r.reader.Metadata().StripeStats[0].GetColStats()[r.orcSelect.IndexInFile["key"]+1].StringStatistics.Minimum
return *r.reader.Metadata().StripeStats[0].GetColStats()[r.orcSelect.IndexInFile[keyFieldName]+1].StringStatistics.Minimum
}

func (r *OrcInventoryFileReader) LastObjectKey() string {
return *r.reader.Metadata().StripeStats[0].GetColStats()[r.orcSelect.IndexInFile["key"]+1].StringStatistics.Maximum
return *r.reader.Metadata().StripeStats[0].GetColStats()[r.orcSelect.IndexInFile[keyFieldName]+1].StringStatistics.Maximum
}
108 changes: 104 additions & 4 deletions cloud/aws/s3inventory/parquet_reader.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,120 @@
package s3inventory

import "github.com/xitongsys/parquet-go/reader"
import (
"fmt"
"time"

"github.com/cznic/mathutil"
"github.com/go-openapi/swag"
"github.com/spf13/cast"
"github.com/xitongsys/parquet-go/parquet"
"github.com/xitongsys/parquet-go/reader"
"github.com/xitongsys/parquet-go/schema"
)

type ParquetInventoryFileReader struct {
reader.ParquetReader
*reader.ParquetReader
nextRow int64
fieldToParquetPath map[string]string
}

func NewParquetInventoryFileReader(parquetReader *reader.ParquetReader) (*ParquetInventoryFileReader, error) {
fieldToParquetPath := getParquetPaths(parquetReader.SchemaHandler)
for _, required := range requiredFields {
if _, ok := fieldToParquetPath[required]; !ok {
return nil, fmt.Errorf("%w: %s", ErrRequiredFieldNotFound, required)
}
}
return &ParquetInventoryFileReader{
ParquetReader: parquetReader,
fieldToParquetPath: fieldToParquetPath,
}, nil
}

func (p *ParquetInventoryFileReader) Close() error {
p.ReadStop()
return p.PFile.Close()
}

func (p *ParquetInventoryFileReader) getKeyColumnStatistics() *parquet.Statistics {
for i, c := range p.Footer.RowGroups[0].Columns {
if c.MetaData.PathInSchema[len(c.GetMetaData().GetPathInSchema())-1] == "Key" {
return p.Footer.RowGroups[0].Columns[i].GetMetaData().GetStatistics()
}
}
return p.Footer.RowGroups[0].Columns[1].GetMetaData().GetStatistics()
}
func (p *ParquetInventoryFileReader) FirstObjectKey() string {
return string(p.Footer.RowGroups[0].Columns[0].GetMetaData().GetStatistics().GetMinValue())
return string(p.getKeyColumnStatistics().GetMin())
}

func (p *ParquetInventoryFileReader) LastObjectKey() string {
return string(p.Footer.RowGroups[0].Columns[0].GetMetaData().GetStatistics().GetMaxValue())
return string(p.getKeyColumnStatistics().GetMax())
}

func (p *ParquetInventoryFileReader) Read(n int) ([]*InventoryObject, error) {
num := mathutil.MinInt64(int64(n), p.GetNumRows()-p.nextRow)
p.nextRow += num
res := make([]*InventoryObject, num)
for fieldName, path := range p.fieldToParquetPath {
columnRes, _, dls, err := p.ReadColumnByPath(path, num)
if err != nil {
return nil, fmt.Errorf("failed to read parquet column %s: %w", fieldName, err)
}
for i, v := range columnRes {
if !isRequired(fieldName) && dls[i] == 0 {
// got no value for non-required field, move on
continue
}
if res[i] == nil {
res[i] = NewInventoryObject()
}
err := set(res[i], fieldName, v)
if err != nil {
return nil, fmt.Errorf("failed to read parquet column %s: %w", fieldName, err)
}
}
}
return res, nil
}

func set(o *InventoryObject, f string, v interface{}) error {
var err error
switch f {
case bucketFieldName:
o.Bucket, err = cast.ToStringE(v)
case keyFieldName:
o.Key, err = cast.ToStringE(v)
case isLatestFieldName:
o.IsLatest, err = cast.ToBoolE(v)
case isDeleteMarkerFieldName:
o.IsDeleteMarker, err = cast.ToBoolE(v)
case sizeFieldName:
o.Size, err = cast.ToInt64E(v)
case lastModifiedDateFieldName:
var lastModifiedMillis int64
lastModifiedMillis, err = cast.ToInt64E(v)
seconds := lastModifiedMillis / int64(time.Second/time.Millisecond)
ns := (lastModifiedMillis % 1000) * int64(time.Millisecond/time.Nanosecond)
o.LastModified = swag.Time(time.Unix(seconds, ns))
case eTagFieldName:
o.Checksum, err = cast.ToStringE(v)
default:
return fmt.Errorf("%w: %s", ErrUnknownField, f)
}
return err
}

// getParquetPaths returns parquet schema fields as a mapping from their base column name to their path in ParquetReader
// only known inventory fields are returned
func getParquetPaths(schemaHandler *schema.SchemaHandler) map[string]string {
res := make(map[string]string)
for i, fieldInfo := range schemaHandler.Infos {
for _, field := range inventoryFields {
if fieldInfo.ExName == field {
res[field] = schemaHandler.IndexMap[int32(i)]
}
}
}
return res
}
Loading

0 comments on commit b78e5c6

Please sign in to comment.