From b78e5c6ad96d766238bbfe8872737db61fe0cdc3 Mon Sep 17 00:00:00 2001 From: johnnyaug Date: Mon, 12 Oct 2020 20:02:26 +0300 Subject: [PATCH] 557 import tool usability (#800) * parquet reader: interface change, fix crash on missing fields * skip optional, empty fields * import tool usability+tests * fix test * order * order * CR fixes + simplifications: remove pointers from InventoryObject, change time field to time.Time * CR fix: run test once for each format * CR fix: add default to switch * add sub-second part of modification time + tests --- block/inventory.go | 2 +- block/s3/inventory_iterator.go | 20 +- block/s3/inventory_test.go | 25 +- cloud/aws/s3inventory/orc_reader.go | 64 ++--- cloud/aws/s3inventory/parquet_reader.go | 108 +++++++- cloud/aws/s3inventory/reader.go | 59 +++- cloud/aws/s3inventory/reader_test.go | 340 ++++++++++++++++-------- cloud/aws/s3inventory/utils_test.go | 95 +++++++ go.mod | 1 + onboard/catalog_actions.go | 2 +- onboard/inventory_test.go | 6 +- onboard/utils_test.go | 3 +- 12 files changed, 520 insertions(+), 205 deletions(-) create mode 100644 cloud/aws/s3inventory/utils_test.go diff --git a/block/inventory.go b/block/inventory.go index 20b44eb617b..b4824c189f3 100644 --- a/block/inventory.go +++ b/block/inventory.go @@ -23,7 +23,7 @@ type InventoryObject struct { Bucket string Key string Size int64 - LastModified time.Time + LastModified *time.Time Checksum string PhysicalAddress string } diff --git a/block/s3/inventory_iterator.go b/block/s3/inventory_iterator.go index bd4c964af26..247607445e6 100644 --- a/block/s3/inventory_iterator.go +++ b/block/s3/inventory_iterator.go @@ -17,7 +17,7 @@ type InventoryIterator struct { *Inventory err error val *block.InventoryObject - buffer []s3inventory.InventoryObject + buffer []*s3inventory.InventoryObject inventoryFileIndex int valIndexInBuffer int inventoryFileProgress *cmdutils.Progress @@ -94,8 +94,7 @@ func (it *InventoryIterator) fillBuffer() bool { it.logger.Errorf("failed to close manifest file reader. file=%s, err=%w", it.Manifest.Files[it.inventoryFileIndex].Key, err) } }() - it.buffer = make([]s3inventory.InventoryObject, rdr.GetNumRows()) - err = rdr.Read(&it.buffer) + it.buffer, err = rdr.Read(int(rdr.GetNumRows())) if err != nil { it.err = err return false @@ -106,23 +105,16 @@ func (it *InventoryIterator) fillBuffer() bool { func (it *InventoryIterator) nextFromBuffer() *block.InventoryObject { for i := it.valIndexInBuffer + 1; i < len(it.buffer); i++ { obj := it.buffer[i] - if (obj.IsLatest != nil && !*obj.IsLatest) || - (obj.IsDeleteMarker != nil && *obj.IsDeleteMarker) { + if !obj.IsLatest || obj.IsDeleteMarker { continue } res := block.InventoryObject{ Bucket: obj.Bucket, Key: obj.Key, PhysicalAddress: obj.GetPhysicalAddress(), - } - if obj.Size != nil { - res.Size = *obj.Size - } - if obj.LastModifiedMillis != nil { - res.LastModified = time.Unix(*obj.LastModifiedMillis/int64(time.Second/time.Millisecond), 0) - } - if obj.Checksum != nil { - res.Checksum = *obj.Checksum + Size: obj.Size, + LastModified: obj.LastModified, + Checksum: obj.Checksum, } it.valIndexInBuffer = i return &res diff --git a/block/s3/inventory_test.go b/block/s3/inventory_test.go index 3b3cd8d3353..505779e8e68 100644 --- a/block/s3/inventory_test.go +++ b/block/s3/inventory_test.go @@ -30,10 +30,10 @@ func rows(keys []string, lastModified map[string]time.Time) []*s3inventory.Inven if key != "" { res[i] = new(s3inventory.InventoryObject) res[i].Key = key - res[i].IsLatest = swag.Bool(!strings.Contains(key, "_expired")) - res[i].IsDeleteMarker = swag.Bool(strings.Contains(key, "_del")) + res[i].IsLatest = !strings.Contains(key, "_expired") + res[i].IsDeleteMarker = strings.Contains(key, "_del") if lastModified != nil { - res[i].LastModifiedMillis = swag.Int64(lastModified[key].Unix() * 1000) + res[i].LastModified = swag.Time(lastModified[key]) } } } @@ -196,9 +196,8 @@ func TestIterator(t *testing.T) { if obj.Key != test.ExpectedObjects[i] { t.Fatalf("at index %d: expected=%s, got=%s", i, test.ExpectedObjects[i], obj.Key) } - expectedLastModified := lastModified[obj.Key].Truncate(time.Second) - if obj.LastModified != expectedLastModified { - t.Fatalf("last modified for object in index %d different than expected. expected=%v, got=%v", i, expectedLastModified, obj.LastModified) + if *obj.LastModified != lastModified[obj.Key] { + t.Fatalf("last modified for object in index %d different than expected. expected=%v, got=%v", i, lastModified[obj.Key], obj.LastModified) } } } @@ -246,18 +245,16 @@ func (m *mockInventoryFileReader) Close() error { return nil } -func (m *mockInventoryFileReader) Read(dstInterface interface{}) error { - res := make([]s3inventory.InventoryObject, 0, len(m.rows)) - dst := dstInterface.(*[]s3inventory.InventoryObject) - for i := m.nextIdx; i < len(m.rows) && i < m.nextIdx+len(*dst); i++ { +func (m *mockInventoryFileReader) Read(n int) ([]*s3inventory.InventoryObject, error) { + res := make([]*s3inventory.InventoryObject, 0, len(m.rows)) + for i := m.nextIdx; i < len(m.rows) && i < m.nextIdx+n; i++ { if m.rows[i] == nil { - return ErrReadFile // for test - simulate file with error + return nil, ErrReadFile // for test - simulate file with error } - res = append(res, *m.rows[i]) + res = append(res, m.rows[i]) } m.nextIdx = m.nextIdx + len(res) - *dst = res - return nil + return res, nil } func (m *mockInventoryFileReader) GetNumRows() int64 { diff --git a/cloud/aws/s3inventory/orc_reader.go b/cloud/aws/s3inventory/orc_reader.go index c75b9449aec..7aba6c54852 100644 --- a/cloud/aws/s3inventory/orc_reader.go +++ b/cloud/aws/s3inventory/orc_reader.go @@ -2,7 +2,6 @@ package s3inventory import ( "context" - "reflect" "time" "github.com/go-openapi/swag" @@ -30,7 +29,6 @@ type OrcSelect struct { } func getOrcSelect(typeDescription *orc.TypeDescription) *OrcSelect { - relevantFields := []string{"bucket", "key", "size", "last_modified_date", "e_tag", "is_delete_marker", "is_latest"} res := &OrcSelect{ SelectFields: nil, IndexInFile: make(map[string]int), @@ -40,7 +38,7 @@ func getOrcSelect(typeDescription *orc.TypeDescription) *OrcSelect { res.IndexInFile[field] = i } j := 0 - for _, field := range relevantFields { + for _, field := range inventoryFields { if _, ok := res.IndexInFile[field]; ok { res.SelectFields = append(res.SelectFields, field) res.IndexInSelect[field] = j @@ -50,45 +48,34 @@ func getOrcSelect(typeDescription *orc.TypeDescription) *OrcSelect { return res } -func (r *OrcInventoryFileReader) inventoryObjectFromRow(rowData []interface{}) InventoryObject { - var size *int64 - if sizeIdx, ok := r.orcSelect.IndexInSelect["size"]; ok && rowData[sizeIdx] != nil { - size = swag.Int64(rowData[sizeIdx].(int64)) +func (r *OrcInventoryFileReader) inventoryObjectFromRow(rowData []interface{}) *InventoryObject { + obj := NewInventoryObject() + obj.Bucket = rowData[r.orcSelect.IndexInSelect[bucketFieldName]].(string) + obj.Key = rowData[r.orcSelect.IndexInSelect[keyFieldName]].(string) + if sizeIdx, ok := r.orcSelect.IndexInSelect[sizeFieldName]; ok && rowData[sizeIdx] != nil { + obj.Size = rowData[sizeIdx].(int64) } - var lastModifiedMillis *int64 - if lastModifiedIdx, ok := r.orcSelect.IndexInSelect["last_modified_date"]; ok && rowData[lastModifiedIdx] != nil { - lastModifiedMillis = swag.Int64(rowData[lastModifiedIdx].(time.Time).UnixNano() / int64(time.Millisecond)) + if lastModifiedIdx, ok := r.orcSelect.IndexInSelect[lastModifiedDateFieldName]; ok && rowData[lastModifiedIdx] != nil { + obj.LastModified = swag.Time(rowData[lastModifiedIdx].(time.Time)) } - var eTag *string - if eTagIdx, ok := r.orcSelect.IndexInSelect["e_tag"]; ok && rowData[eTagIdx] != nil { - eTag = swag.String(rowData[eTagIdx].(string)) + if eTagIdx, ok := r.orcSelect.IndexInSelect[eTagFieldName]; ok && rowData[eTagIdx] != nil { + obj.Checksum = rowData[eTagIdx].(string) } - var isLatest *bool - if isLatestIdx, ok := r.orcSelect.IndexInSelect["is_latest"]; ok && rowData[isLatestIdx] != nil { - isLatest = swag.Bool(rowData[isLatestIdx].(bool)) + if isLatestIdx, ok := r.orcSelect.IndexInSelect[isLatestFieldName]; ok && rowData[isLatestIdx] != nil { + obj.IsLatest = rowData[isLatestIdx].(bool) } - var isDeleteMarker *bool - if isDeleteMarkerIdx, ok := r.orcSelect.IndexInSelect["is_delete_marker"]; ok && rowData[isDeleteMarkerIdx] != nil { - isDeleteMarker = swag.Bool(rowData[isDeleteMarkerIdx].(bool)) - } - return InventoryObject{ - Bucket: rowData[r.orcSelect.IndexInSelect["bucket"]].(string), - Key: rowData[r.orcSelect.IndexInSelect["key"]].(string), - Size: size, - LastModifiedMillis: lastModifiedMillis, - Checksum: eTag, - IsLatest: isLatest, - IsDeleteMarker: isDeleteMarker, + if isDeleteMarkerIdx, ok := r.orcSelect.IndexInSelect[isDeleteMarkerFieldName]; ok && rowData[isDeleteMarkerIdx] != nil { + obj.IsDeleteMarker = rowData[isDeleteMarkerIdx].(bool) } + return obj } -func (r *OrcInventoryFileReader) Read(dstInterface interface{}) error { - num := reflect.ValueOf(dstInterface).Elem().Len() - res := make([]InventoryObject, 0, num) +func (r *OrcInventoryFileReader) Read(n int) ([]*InventoryObject, error) { + res := make([]*InventoryObject, 0, n) for { select { case <-r.ctx.Done(): - return r.ctx.Err() + return nil, r.ctx.Err() default: } if !r.cursor.Next() { @@ -99,14 +86,13 @@ func (r *OrcInventoryFileReader) Read(dstInterface interface{}) error { break } } - res = append(res, r.inventoryObjectFromRow(r.cursor.Row())) - if len(res) == num { + obj := r.inventoryObjectFromRow(r.cursor.Row()) + res = append(res, obj) + if len(res) == n { break } } - - reflect.ValueOf(dstInterface).Elem().Set(reflect.ValueOf(res)) - return nil + return res, nil } func (r *OrcInventoryFileReader) GetNumRows() int64 { @@ -128,9 +114,9 @@ func (r *OrcInventoryFileReader) Close() error { } func (r *OrcInventoryFileReader) FirstObjectKey() string { - return *r.reader.Metadata().StripeStats[0].GetColStats()[r.orcSelect.IndexInFile["key"]+1].StringStatistics.Minimum + return *r.reader.Metadata().StripeStats[0].GetColStats()[r.orcSelect.IndexInFile[keyFieldName]+1].StringStatistics.Minimum } func (r *OrcInventoryFileReader) LastObjectKey() string { - return *r.reader.Metadata().StripeStats[0].GetColStats()[r.orcSelect.IndexInFile["key"]+1].StringStatistics.Maximum + return *r.reader.Metadata().StripeStats[0].GetColStats()[r.orcSelect.IndexInFile[keyFieldName]+1].StringStatistics.Maximum } diff --git a/cloud/aws/s3inventory/parquet_reader.go b/cloud/aws/s3inventory/parquet_reader.go index 33b5255427a..cc3ca586cfb 100644 --- a/cloud/aws/s3inventory/parquet_reader.go +++ b/cloud/aws/s3inventory/parquet_reader.go @@ -1,9 +1,34 @@ package s3inventory -import "github.com/xitongsys/parquet-go/reader" +import ( + "fmt" + "time" + + "github.com/cznic/mathutil" + "github.com/go-openapi/swag" + "github.com/spf13/cast" + "github.com/xitongsys/parquet-go/parquet" + "github.com/xitongsys/parquet-go/reader" + "github.com/xitongsys/parquet-go/schema" +) type ParquetInventoryFileReader struct { - reader.ParquetReader + *reader.ParquetReader + nextRow int64 + fieldToParquetPath map[string]string +} + +func NewParquetInventoryFileReader(parquetReader *reader.ParquetReader) (*ParquetInventoryFileReader, error) { + fieldToParquetPath := getParquetPaths(parquetReader.SchemaHandler) + for _, required := range requiredFields { + if _, ok := fieldToParquetPath[required]; !ok { + return nil, fmt.Errorf("%w: %s", ErrRequiredFieldNotFound, required) + } + } + return &ParquetInventoryFileReader{ + ParquetReader: parquetReader, + fieldToParquetPath: fieldToParquetPath, + }, nil } func (p *ParquetInventoryFileReader) Close() error { @@ -11,10 +36,85 @@ func (p *ParquetInventoryFileReader) Close() error { return p.PFile.Close() } +func (p *ParquetInventoryFileReader) getKeyColumnStatistics() *parquet.Statistics { + for i, c := range p.Footer.RowGroups[0].Columns { + if c.MetaData.PathInSchema[len(c.GetMetaData().GetPathInSchema())-1] == "Key" { + return p.Footer.RowGroups[0].Columns[i].GetMetaData().GetStatistics() + } + } + return p.Footer.RowGroups[0].Columns[1].GetMetaData().GetStatistics() +} func (p *ParquetInventoryFileReader) FirstObjectKey() string { - return string(p.Footer.RowGroups[0].Columns[0].GetMetaData().GetStatistics().GetMinValue()) + return string(p.getKeyColumnStatistics().GetMin()) } func (p *ParquetInventoryFileReader) LastObjectKey() string { - return string(p.Footer.RowGroups[0].Columns[0].GetMetaData().GetStatistics().GetMaxValue()) + return string(p.getKeyColumnStatistics().GetMax()) +} + +func (p *ParquetInventoryFileReader) Read(n int) ([]*InventoryObject, error) { + num := mathutil.MinInt64(int64(n), p.GetNumRows()-p.nextRow) + p.nextRow += num + res := make([]*InventoryObject, num) + for fieldName, path := range p.fieldToParquetPath { + columnRes, _, dls, err := p.ReadColumnByPath(path, num) + if err != nil { + return nil, fmt.Errorf("failed to read parquet column %s: %w", fieldName, err) + } + for i, v := range columnRes { + if !isRequired(fieldName) && dls[i] == 0 { + // got no value for non-required field, move on + continue + } + if res[i] == nil { + res[i] = NewInventoryObject() + } + err := set(res[i], fieldName, v) + if err != nil { + return nil, fmt.Errorf("failed to read parquet column %s: %w", fieldName, err) + } + } + } + return res, nil +} + +func set(o *InventoryObject, f string, v interface{}) error { + var err error + switch f { + case bucketFieldName: + o.Bucket, err = cast.ToStringE(v) + case keyFieldName: + o.Key, err = cast.ToStringE(v) + case isLatestFieldName: + o.IsLatest, err = cast.ToBoolE(v) + case isDeleteMarkerFieldName: + o.IsDeleteMarker, err = cast.ToBoolE(v) + case sizeFieldName: + o.Size, err = cast.ToInt64E(v) + case lastModifiedDateFieldName: + var lastModifiedMillis int64 + lastModifiedMillis, err = cast.ToInt64E(v) + seconds := lastModifiedMillis / int64(time.Second/time.Millisecond) + ns := (lastModifiedMillis % 1000) * int64(time.Millisecond/time.Nanosecond) + o.LastModified = swag.Time(time.Unix(seconds, ns)) + case eTagFieldName: + o.Checksum, err = cast.ToStringE(v) + default: + return fmt.Errorf("%w: %s", ErrUnknownField, f) + } + return err +} + +// getParquetPaths returns parquet schema fields as a mapping from their base column name to their path in ParquetReader +// only known inventory fields are returned +func getParquetPaths(schemaHandler *schema.SchemaHandler) map[string]string { + res := make(map[string]string) + for i, fieldInfo := range schemaHandler.Infos { + for _, field := range inventoryFields { + if fieldInfo.ExName == field { + res[field] = schemaHandler.IndexMap[int32(i)] + } + } + } + return res } diff --git a/cloud/aws/s3inventory/reader.go b/cloud/aws/s3inventory/reader.go index 18c46211b81..34a34a74fec 100644 --- a/cloud/aws/s3inventory/reader.go +++ b/cloud/aws/s3inventory/reader.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "time" "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/scritchley/orc" @@ -12,6 +13,28 @@ import ( "github.com/xitongsys/parquet-go/reader" ) +const ( + bucketFieldName = "bucket" + keyFieldName = "key" + sizeFieldName = "size" + lastModifiedDateFieldName = "last_modified_date" + eTagFieldName = "e_tag" + isDeleteMarkerFieldName = "is_delete_marker" + isLatestFieldName = "is_latest" +) + +var inventoryFields = []string{ + bucketFieldName, + keyFieldName, + sizeFieldName, + lastModifiedDateFieldName, + eTagFieldName, + isDeleteMarkerFieldName, + isLatestFieldName, +} + +var requiredFields = []string{bucketFieldName, keyFieldName} + const ( OrcFormatName = "ORC" ParquetFormatName = "Parquet" @@ -19,6 +42,8 @@ const ( var ( ErrUnsupportedInventoryFormat = errors.New("unsupported inventory type. supported types: parquet, orc") + ErrRequiredFieldNotFound = errors.New("required field not found in inventory") + ErrUnknownField = errors.New("unknown field") ) type IReader interface { @@ -27,13 +52,17 @@ type IReader interface { } type InventoryObject struct { - Bucket string `parquet:"name=bucket, type=UTF8"` - Key string `parquet:"name=key, type=UTF8"` - IsLatest *bool `parquet:"name=is_latest, type=BOOLEAN"` - IsDeleteMarker *bool `parquet:"name=is_delete_marker, type=BOOLEAN"` - Size *int64 `parquet:"name=size, type=INT_64"` - LastModifiedMillis *int64 `parquet:"name=last_modified_date, type=TIMESTAMP_MILLIS"` - Checksum *string `parquet:"name=e_tag, type=UTF8"` + Bucket string + Key string + IsLatest bool + IsDeleteMarker bool + Size int64 + LastModified *time.Time + Checksum string +} + +func NewInventoryObject() *InventoryObject { + return &InventoryObject{IsLatest: true} } func (o *InventoryObject) GetPhysicalAddress() string { @@ -55,7 +84,7 @@ type MetadataReader interface { type FileReader interface { MetadataReader - Read(dstInterface interface{}) error + Read(n int) ([]*InventoryObject, error) } func NewReader(ctx context.Context, svc s3iface.S3API, logger logging.Logger) IReader { @@ -87,12 +116,11 @@ func (o *Reader) getParquetReader(bucket string, key string) (FileReader, error) if err != nil { return nil, fmt.Errorf("failed to create parquet file reader: %w", err) } - var rawObject InventoryObject - pr, err := reader.NewParquetReader(pf, &rawObject, 4) + pr, err := reader.NewParquetReader(pf, nil, 4) if err != nil { return nil, fmt.Errorf("failed to create parquet reader: %w", err) } - return &ParquetInventoryFileReader{ParquetReader: *pr}, nil + return NewParquetInventoryFileReader(pr) } func (o *Reader) getOrcReader(bucket string, key string, tailOnly bool) (FileReader, error) { @@ -113,3 +141,12 @@ func (o *Reader) getOrcReader(bucket string, key string, tailOnly bool) (FileRea cursor: orcReader.Select(orcSelect.SelectFields...), }, nil } + +func isRequired(field string) bool { + for _, f := range requiredFields { + if f == field { + return true + } + } + return false +} diff --git a/cloud/aws/s3inventory/reader_test.go b/cloud/aws/s3inventory/reader_test.go index 2131d2c7426..39d4778275f 100644 --- a/cloud/aws/s3inventory/reader_test.go +++ b/cloud/aws/s3inventory/reader_test.go @@ -2,115 +2,174 @@ package s3inventory import ( "context" + "encoding/json" "fmt" "io/ioutil" - "net/http/httptest" "os" + "strings" "testing" "time" "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" - "github.com/aws/aws-sdk-go/service/s3/s3iface" - "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/cznic/mathutil" "github.com/go-openapi/swag" - "github.com/johannesboyne/gofakes3" - "github.com/johannesboyne/gofakes3/backend/s3mem" "github.com/scritchley/orc" "github.com/treeverse/lakefs/logging" + "github.com/xitongsys/parquet-go-source/local" + "github.com/xitongsys/parquet-go/schema" + "github.com/xitongsys/parquet-go/writer" ) const inventoryBucketName = "inventory-bucket" -func generateOrc(t *testing.T, objs <-chan *InventoryObject) string { - f, err := ioutil.TempFile("", "orctest") +type TestObject struct { + Bucket string `parquet:"name=bucket, type=UTF8"` + Key string `parquet:"name=key, type=UTF8"` + IsLatest *bool `parquet:"name=is_latest, type=BOOLEAN"` + IsDeleteMarker *bool `parquet:"name=is_delete_marker, type=BOOLEAN"` + Size *int64 `parquet:"name=size, type=INT_64"` + LastModifiedMillis *int64 `parquet:"name=last_modified_date, type=TIMESTAMP_MILLIS"` + Checksum *string `parquet:"name=e_tag, type=UTF8"` +} + +func parquetSchema(fieldToRemove string) *schema.JSONSchemaItemType { + fieldMap := map[string]string{ + bucketFieldName: "name=bucket, inname=Bucket, type=UTF8, repetitiontype=REQUIRED, fieldid=1", + keyFieldName: "name=key, inname=Key, type=UTF8, repetitiontype=REQUIRED, fieldid=2", + isLatestFieldName: "name=is_latest, inname=IsLatest, type=BOOLEAN, repetitiontype=OPTIONAL, fieldid=3", + isDeleteMarkerFieldName: "name=is_delete_marker, inname=IsDeleteMarker, type=BOOLEAN, repetitiontype=OPTIONAL, fieldid=4", + sizeFieldName: "name=size, inname=Size, type=INT64, repetitiontype=OPTIONAL, fieldid=5", + lastModifiedDateFieldName: "name=last_modified_date, inname=LastModifiedMillis, type=INT64, repetitiontype=OPTIONAL, fieldid=6", + eTagFieldName: "name=e_tag, inname=Checksum, type=UTF8, repetitiontype=OPTIONAL, fieldid=7", + } + fields := make([]*schema.JSONSchemaItemType, 0, len(fieldMap)) + for field, tag := range fieldMap { + if field == fieldToRemove { + continue + } + fields = append(fields, &schema.JSONSchemaItemType{Tag: tag}) + } + return &schema.JSONSchemaItemType{ + Tag: "name=s3inventory, repetitiontype=REQUIRED", + Fields: fields, + } +} + +func generateParquet(t *testing.T, objs <-chan *TestObject, fieldToRemove string) *os.File { + f, err := ioutil.TempFile("", "parquettest") if err != nil { - t.Fatal(err) + t.Fatalf("failed to create temp file: %v", err) } defer func() { - _ = f.Close() + _ = os.Remove(f.Name()) }() - schema, err := orc.ParseSchema("struct") + fw, err := local.NewLocalFileWriter(f.Name()) if err != nil { - t.Fatal(err) + t.Fatalf("failed to create parquet file writer: %v", err) } - w, err := orc.NewWriter(f, orc.SetSchema(schema), orc.SetStripeTargetSize(100)) + defer func() { + _ = fw.Close() + }() + jsonSchema, err := json.Marshal(parquetSchema(fieldToRemove)) + jsonSchemaStr := string(jsonSchema) + pw, err := writer.NewParquetWriter(fw, jsonSchemaStr, 4) if err != nil { - t.Fatal(err) + t.Fatalf("failed to create parquet writer: %v", err) } - for o := range objs { - err = w.Write(o.Bucket, o.Key, *o.Size, time.Unix(*o.LastModifiedMillis/1000, 0), *o.Checksum) + for obj := range objs { + err = pw.Write(obj) if err != nil { - t.Fatal(err) + t.Fatalf("failed to write object to parquet: %v", err) } } - err = w.Close() + err = pw.WriteStop() if err != nil { - t.Fatal(err) + t.Fatalf("failed to stop parquet writer: %v", err) } - return f.Name() + return f } -func getS3Fake(t *testing.T) (s3iface.S3API, *httptest.Server) { - backend := s3mem.New() - faker := gofakes3.New(backend) - ts := httptest.NewServer(faker.Server()) - // configure S3 client - s3Config := &aws.Config{ - Credentials: credentials.NewStaticCredentials("YOUR-ACCESSKEYID", "YOUR-SECRETACCESSKEY", ""), - Endpoint: aws.String(ts.URL), - Region: aws.String("eu-central-1"), - DisableSSL: aws.Bool(true), - S3ForcePathStyle: aws.Bool(true), - } - newSession, err := session.NewSession(s3Config) - if err != nil { - t.Fatal(err) +func orcSchema(fieldToRemove string) string { + fieldTypes := map[string]string{ + bucketFieldName: "string", + keyFieldName: "string", + isLatestFieldName: "boolean", + isDeleteMarkerFieldName: "boolean", + sizeFieldName: "int", + lastModifiedDateFieldName: "timestamp", + eTagFieldName: "string", } - return s3.New(newSession), ts + var orcSchema strings.Builder + orcSchema.WriteString("struct<") + isFirst := true + for _, field := range inventoryFields { + if fieldToRemove == field { + continue + } + if !isFirst { + orcSchema.WriteString(",") + } + isFirst = false + orcSchema.WriteString(fmt.Sprintf("%s:%s", field, fieldTypes[field])) + } + orcSchema.WriteString(">") + return orcSchema.String() } -func objs(num int, lastModified []time.Time) <-chan *InventoryObject { - out := make(chan *InventoryObject) - go func() { - defer close(out) - for i := 0; i < num; i++ { - out <- &InventoryObject{ - Bucket: inventoryBucketName, - Key: fmt.Sprintf("f%05d", i), - Size: swag.Int64(500), - LastModifiedMillis: swag.Int64(lastModified[i%len(lastModified)].Unix() * 1000), - Checksum: swag.String("abcdefg"), - } +func getOrcValues(o *TestObject, fieldToRemove string) []interface{} { + fieldValues := map[string]interface{}{ + bucketFieldName: o.Bucket, + keyFieldName: o.Key, + isLatestFieldName: o.IsLatest == nil || swag.BoolValue(o.IsLatest), + isDeleteMarkerFieldName: swag.BoolValue(o.IsDeleteMarker), + sizeFieldName: swag.Int64Value(o.Size), + lastModifiedDateFieldName: time.Unix(swag.Int64Value(o.LastModifiedMillis)/1000, + (swag.Int64Value(o.LastModifiedMillis)%1000)*1_000_000), + eTagFieldName: swag.StringValue(o.Checksum), + } + values := make([]interface{}, 0, len(fieldValues)) + for _, field := range inventoryFields { + if fieldToRemove == field { + continue } - }() - return out + values = append(values, fieldValues[field]) + } + return values } -func uploadFile(t *testing.T, s3 s3iface.S3API, inventoryBucket string, inventoryFilename string, objs <-chan *InventoryObject) { - localOrcFile := generateOrc(t, objs) - f, err := os.Open(localOrcFile) +func generateOrc(t *testing.T, objs <-chan *TestObject, fieldToRemove string) *os.File { + f, err := ioutil.TempFile("", "orctest") if err != nil { - t.Fatal(err) + t.Fatalf("failed to create temp file: %v", err) } defer func() { - _ = f.Close() + _ = os.Remove(f.Name()) }() - uploader := s3manager.NewUploaderWithClient(s3) - _, err = uploader.Upload(&s3manager.UploadInput{ - Bucket: aws.String(inventoryBucket), - Key: aws.String(inventoryFilename), - Body: f, - }) + orcSchema := orcSchema(fieldToRemove) + s, err := orc.ParseSchema(orcSchema) if err != nil { - t.Fatal(err) + t.Fatalf("failed to parse orc schema: %v", err) + } + w, err := orc.NewWriter(f, orc.SetSchema(s), orc.SetStripeTargetSize(100)) + if err != nil { + t.Fatalf("failed to create orc writer: %v", err) + } + for o := range objs { + err = w.Write(getOrcValues(o, fieldToRemove)...) + if err != nil { + t.Fatalf("failed to write object to orc: %v", err) + } + } + err = w.Close() + if err != nil { + t.Fatalf("failed to close orc writer: %v", err) } + _, _ = f.Seek(0, 0) + return f } -func TestInventoryReader(t *testing.T) { +func TestReaders(t *testing.T) { svc, testServer := getS3Fake(t) defer testServer.Close() _, err := svc.CreateBucket(&s3.CreateBucketInput{ @@ -119,82 +178,129 @@ func TestInventoryReader(t *testing.T) { if err != nil { t.Fatal(err) } - testdata := []struct { + testdata := map[string]struct { ObjectNum int ExpectedReadObjects int ExpectedMaxValue string ExpectedMinValue string + ExcludeField string }{ - { + "2 objects": { ObjectNum: 2, ExpectedReadObjects: 2, ExpectedMinValue: "f00000", ExpectedMaxValue: "f00001", }, - { + "12500 objects": { + ObjectNum: 12500, + ExpectedReadObjects: 12500, + ExpectedMinValue: "f00000", + ExpectedMaxValue: "f12499", + }, + "100 objects": { + ObjectNum: 100, + ExpectedReadObjects: 100, + ExpectedMinValue: "f00000", + ExpectedMaxValue: "f00099", + }, + "without size field": { + ObjectNum: 12500, + ExpectedReadObjects: 12500, + ExpectedMinValue: "f00000", + ExpectedMaxValue: "f12499", + ExcludeField: "size", + }, + "without is_latest field": { + ObjectNum: 12500, + ExpectedReadObjects: 12500, + ExpectedMinValue: "f00000", + ExpectedMaxValue: "f12499", + ExcludeField: "is_latest", + }, + "without is_delete_marker field": { ObjectNum: 12500, ExpectedReadObjects: 12500, ExpectedMinValue: "f00000", ExpectedMaxValue: "f12499", + ExcludeField: "is_delete_marker", }, - { + "without e_tag field": { ObjectNum: 100, ExpectedReadObjects: 100, ExpectedMinValue: "f00000", ExpectedMaxValue: "f00099", + ExcludeField: "e_tag", }, } - - for _, test := range testdata { - now := time.Now() - lastModified := []time.Time{now, now.Add(-1 * time.Hour), now.Add(-2 * time.Hour), now.Add(-3 * time.Hour)} - uploadFile(t, svc, inventoryBucketName, "myFile.orc", objs(test.ObjectNum, lastModified)) - reader := NewReader(context.Background(), svc, logging.Default()) - fileReader, err := reader.GetFileReader("ORC", inventoryBucketName, "myFile.orc") - if err != nil { - t.Fatal(err) - } - numRowsResult := int(fileReader.GetNumRows()) - if test.ObjectNum != numRowsResult { - t.Fatalf("unexpected result from GetNumRows. expected=%d, got=%d", test.ObjectNum, numRowsResult) - } - minValueResult := fileReader.FirstObjectKey() - if test.ExpectedMinValue != minValueResult { - t.Fatalf("unexpected result from FirstObjectKey. expected=%s, got=%s", test.ExpectedMinValue, minValueResult) - } - maxValueResult := fileReader.LastObjectKey() - if test.ExpectedMaxValue != maxValueResult { - t.Fatalf("unexpected result from LastObjectKey. expected=%s, got=%s", test.ExpectedMaxValue, maxValueResult) - } - readBatchSize := 1000 - res := make([]InventoryObject, readBatchSize) - offset := 0 - readCount := 0 - for { - err = fileReader.Read(&res) - for i := offset; i < mathutil.Min(offset+readBatchSize, test.ObjectNum); i++ { - if res[i-offset].Key != fmt.Sprintf("f%05d", i) { - t.Fatalf("result in index %d different than expected. expected=%s, got=%s (batch #%d, index %d)", i, fmt.Sprintf("f%05d", i), res[i-offset].Key, offset/readBatchSize, i-offset) + for _, format := range []string{"ORC", "Parquet"} { + for testName, test := range testdata { + t.Run(fmt.Sprintf("%s %s", strings.ToLower(format), testName), func(t *testing.T) { + now := time.Now().Truncate(time.Millisecond) + lastModified := []time.Time{now, now.Add(-1 * time.Hour), now.Add(-2 * time.Hour), now.Add(-3 * time.Hour)} + var localFile *os.File + if format == "ORC" { + localFile = generateOrc(t, objs(test.ObjectNum, lastModified), test.ExcludeField) + } else if format == "Parquet" { + localFile = generateParquet(t, objs(test.ObjectNum, lastModified), test.ExcludeField) } - expectedLastModified := lastModified[i%len(lastModified)].Unix() * 1000 - if *res[i-offset].LastModifiedMillis != expectedLastModified { - t.Fatalf("unexpected timestamp for result in index %d. expected=%d, got=%d (batch #%d, index %d)", i, expectedLastModified, *res[i-offset].LastModifiedMillis, offset/readBatchSize, i-offset) + uploadFile(t, svc, inventoryBucketName, "myFile.inv", localFile) + reader := NewReader(context.Background(), svc, logging.Default()) + fileReader, err := reader.GetFileReader(format, inventoryBucketName, "myFile.inv") + if err != nil { + t.Fatalf("failed to create file reader: %v", err) } - } - offset += len(res) - readCount += len(res) - if err != nil { - t.Fatal(err) - } - if len(res) != readBatchSize { - break - } - } - if test.ExpectedReadObjects != readCount { - t.Fatalf("read unexpected number of keys from inventory. expected=%d, got=%d", test.ExpectedReadObjects, readCount) - } - if fileReader.Close() != nil { - t.Fatalf("failed to close file reader") + numRowsResult := int(fileReader.GetNumRows()) + if test.ObjectNum != numRowsResult { + t.Fatalf("unexpected result from GetNumRows. expected=%d, got=%d", test.ObjectNum, numRowsResult) + } + minValueResult := fileReader.FirstObjectKey() + if test.ExpectedMinValue != minValueResult { + t.Fatalf("unexpected result from FirstObjectKey. expected=%s, got=%s", test.ExpectedMinValue, minValueResult) + } + maxValueResult := fileReader.LastObjectKey() + if test.ExpectedMaxValue != maxValueResult { + t.Fatalf("unexpected result from LastObjectKey. expected=%s, got=%s", test.ExpectedMaxValue, maxValueResult) + } + readBatchSize := 1000 + offset := 0 + readCount := 0 + for { + res, err := fileReader.Read(readBatchSize) + if err != nil { + t.Fatalf("failed to read from file reader: %v", err) + } + expectedSize := 500 + if test.ExcludeField == "size" { + expectedSize = 0 + } + expectedChecksum := "abcdefg" + if test.ExcludeField == "e_tag" { + expectedChecksum = "" + } + for i := offset; i < mathutil.Min(offset+readBatchSize, test.ObjectNum); i++ { + verifyObject(t, res[i-offset], &InventoryObject{ + Bucket: inventoryBucketName, + Key: fmt.Sprintf("f%05d", i), + IsLatest: true, + IsDeleteMarker: false, + Size: int64(expectedSize), + LastModified: &lastModified[i%len(lastModified)], + Checksum: expectedChecksum, + }, i, offset/readBatchSize, i-offset) + } + offset += len(res) + readCount += len(res) + if len(res) != readBatchSize { + break + } + } + if test.ExpectedReadObjects != readCount { + t.Fatalf("read unexpected number of keys from inventory. expected=%d, got=%d", test.ExpectedReadObjects, readCount) + } + if fileReader.Close() != nil { + t.Fatalf("failed to close file reader") + } + }) } } } diff --git a/cloud/aws/s3inventory/utils_test.go b/cloud/aws/s3inventory/utils_test.go new file mode 100644 index 00000000000..cbcd676a789 --- /dev/null +++ b/cloud/aws/s3inventory/utils_test.go @@ -0,0 +1,95 @@ +package s3inventory + +import ( + "fmt" + "net/http/httptest" + "os" + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3iface" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/johannesboyne/gofakes3" + "github.com/johannesboyne/gofakes3/backend/s3mem" + + "github.com/go-openapi/swag" +) + +func verifyObject(t *testing.T, actual *InventoryObject, expected *InventoryObject, index int, batchId int, indexInBatch int) { + if expected.Bucket != actual.Bucket { + t.Fatalf("bucket in index %d different than expected. expected=%s, got=%s (batch #%d, index %d)", index, expected.Bucket, actual.Bucket, batchId, indexInBatch) + } + if expected.Key != actual.Key { + t.Fatalf("object key in index %d different than expected. expected=%s, got=%s (batch #%d, index %d)", index, expected.Key, actual.Key, batchId, indexInBatch) + } + if expected.Size != actual.Size { + t.Fatalf("size in index %d different than expected. expected=%d, got=%d (batch #%d, index %d)", index, expected.Size, actual.Size, batchId, indexInBatch) + } + if expected.Checksum != actual.Checksum { + t.Fatalf("e_tag in index %d different than expected. expected=%s, got=%s (batch #%d, index %d)", index, expected.Checksum, actual.Checksum, batchId, indexInBatch) + } + if !expected.LastModified.Equal(*actual.LastModified) { + t.Fatalf("last_modified_time in index %d different than expected. expected=%v, got=%v (batch #%d, index %d)", index, expected.LastModified, actual.LastModified, batchId, indexInBatch) + } + if expected.IsDeleteMarker != actual.IsDeleteMarker { + t.Fatalf("is_delete_marker in index %d different than expected. expected=%v, got=%v (batch #%d, index %d)", index, expected.IsDeleteMarker, actual.IsDeleteMarker, batchId, indexInBatch) + } + if actual.IsLatest != expected.IsLatest { + t.Fatalf("is_latest in index %d different than expected. expected=%v, got=%v (batch #%d, index %d)", index, expected.IsLatest, actual.IsLatest, batchId, indexInBatch) + } +} + +func objs(num int, lastModified []time.Time) <-chan *TestObject { + out := make(chan *TestObject) + go func() { + defer close(out) + for i := 0; i < num; i++ { + out <- &TestObject{ + Bucket: inventoryBucketName, + Key: fmt.Sprintf("f%05d", i), + Size: swag.Int64(500), + LastModifiedMillis: swag.Int64(lastModified[i%len(lastModified)].UnixNano() / 1_000_000), + Checksum: swag.String("abcdefg"), + } + } + }() + return out +} + +func uploadFile(t *testing.T, s3 s3iface.S3API, inventoryBucket string, inventoryFilename string, f *os.File) { + defer func() { + _ = f.Close() + }() + uploader := s3manager.NewUploaderWithClient(s3) + _, err := uploader.Upload(&s3manager.UploadInput{ + Bucket: aws.String(inventoryBucket), + Key: aws.String(inventoryFilename), + Body: f, + }) + if err != nil { + t.Fatal(err) + } +} + +func getS3Fake(t *testing.T) (s3iface.S3API, *httptest.Server) { + backend := s3mem.New() + faker := gofakes3.New(backend) + ts := httptest.NewServer(faker.Server()) + // configure S3 client + s3Config := &aws.Config{ + Credentials: credentials.NewStaticCredentials("YOUR-ACCESSKEYID", "YOUR-SECRETACCESSKEY", ""), + Endpoint: aws.String(ts.URL), + Region: aws.String("eu-central-1"), + DisableSSL: aws.Bool(true), + S3ForcePathStyle: aws.Bool(true), + } + newSession, err := session.NewSession(s3Config) + if err != nil { + t.Fatal(err) + } + return s3.New(newSession), ts +} diff --git a/go.mod b/go.mod index 7579f61d4a9..306b709c8c7 100644 --- a/go.mod +++ b/go.mod @@ -66,6 +66,7 @@ require ( github.com/sirupsen/logrus v1.6.0 github.com/smartystreets/assertions v1.1.1 // indirect github.com/spf13/afero v1.3.4 // indirect + github.com/spf13/cast v1.3.1 github.com/spf13/cobra v1.0.0 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.7.1 diff --git a/onboard/catalog_actions.go b/onboard/catalog_actions.go index c86959a949e..f3af8b52cbb 100644 --- a/onboard/catalog_actions.go +++ b/onboard/catalog_actions.go @@ -93,7 +93,7 @@ func (c *CatalogRepoActions) ApplyImport(ctx context.Context, it Iterator, dryRu entry := catalog.Entry{ Path: obj.Key, PhysicalAddress: obj.PhysicalAddress, - CreationDate: obj.LastModified, + CreationDate: *obj.LastModified, Size: obj.Size, Checksum: obj.Checksum, } diff --git a/onboard/inventory_test.go b/onboard/inventory_test.go index 318517e6e64..573a05d66d4 100644 --- a/onboard/inventory_test.go +++ b/onboard/inventory_test.go @@ -154,7 +154,7 @@ func TestDiff(t *testing.T) { if actualAdded[i].Key != expectedAdded { t.Fatalf("added object in diff index %d different than expected. expected: %s, got: %s", i, expectedAdded, actualAdded[i].Key) } - if actualAdded[i].LastModified != times[expectedAdded] { + if *actualAdded[i].LastModified != times[expectedAdded] { t.Fatalf("modified time for key %s different than expected. expected: %v, got: %v", expectedAdded, times[expectedAdded], actualAdded[i].LastModified) } } @@ -162,7 +162,7 @@ func TestDiff(t *testing.T) { if actualDeleted[i].Key != expectedDeleted { t.Fatalf("deleted object in diff index %d different than expected. expected: %s, got: %s", i, expectedDeleted, actualDeleted[i].Key) } - if actualDeleted[i].LastModified != times[expectedDeleted] { + if *actualDeleted[i].LastModified != times[expectedDeleted] { t.Fatalf("modified time for key %s different than expected. expected: %v, got: %v", expectedDeleted, times[expectedDeleted], actualDeleted[i].LastModified) } } @@ -170,7 +170,7 @@ func TestDiff(t *testing.T) { if actualChanged[i].Key != expectedChanged { t.Fatalf("changed object in diff index %d different than expected. expected: %s, got: %s", i, expectedChanged, actualChanged[i].Key) } - if actualChanged[i].LastModified != times[expectedChanged] { + if *actualChanged[i].LastModified != times[expectedChanged] { t.Fatalf("modified time for key %s different than expected. expected: %v, got: %v", expectedChanged, times[expectedChanged], actualChanged[i].LastModified) } } diff --git a/onboard/utils_test.go b/onboard/utils_test.go index 0edd46968db..4c3a4c26420 100644 --- a/onboard/utils_test.go +++ b/onboard/utils_test.go @@ -6,6 +6,7 @@ import ( "sort" "time" + "github.com/go-openapi/swag" "github.com/treeverse/lakefs/block" "github.com/treeverse/lakefs/catalog" "github.com/treeverse/lakefs/cmdutils" @@ -66,7 +67,7 @@ func (m *mockInventory) rows() []block.InventoryObject { } for i, key := range m.keys { - res = append(res, block.InventoryObject{Key: key, LastModified: m.lastModified[i%len(m.lastModified)], Checksum: m.checksum(key)}) + res = append(res, block.InventoryObject{Key: key, LastModified: swag.Time(m.lastModified[i%len(m.lastModified)]), Checksum: m.checksum(key)}) } return res }