From b9ba9b6baf66ec3e5a8585897c6439dabaa9bf4c Mon Sep 17 00:00:00 2001 From: johnnyaug Date: Thu, 20 May 2021 18:04:55 +0300 Subject: [PATCH 1/3] fix manifest sort bug --- pkg/block/s3/inventory.go | 1 + pkg/block/s3/inventory_iterator.go | 1 + pkg/cloud/aws/s3inventory/parquet_reader.go | 21 +++++++++++++++------ 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/pkg/block/s3/inventory.go b/pkg/block/s3/inventory.go index f56e87479b8..8f8feaadc2e 100644 --- a/pkg/block/s3/inventory.go +++ b/pkg/block/s3/inventory.go @@ -146,6 +146,7 @@ func sortManifest(m *Manifest, logger logging.Logger, reader s3inventory.IReader } m.Files[i].firstKey = mr.FirstObjectKey() m.Files[i].lastKey = mr.LastObjectKey() + logger.Debugf("file %s first %s last %s", m.Files[i].Key, m.Files[i].firstKey, m.Files[i].lastKey) err = mr.Close() if err != nil { logger.Errorf("failed to close inventory file. file=%s, err=%w", f, err) diff --git a/pkg/block/s3/inventory_iterator.go b/pkg/block/s3/inventory_iterator.go index 694e1ada0ad..0e4df895530 100644 --- a/pkg/block/s3/inventory_iterator.go +++ b/pkg/block/s3/inventory_iterator.go @@ -53,6 +53,7 @@ func (it *InventoryIterator) Next() bool { if val != nil { // validate element order if it.shouldSort && it.val != nil && val.Key < it.val.Key { + it.logger.Debugf("%s < %s", val.Key, it.val.Key) it.err = ErrInventoryNotSorted return false } diff --git a/pkg/cloud/aws/s3inventory/parquet_reader.go b/pkg/cloud/aws/s3inventory/parquet_reader.go index c14c1da610d..e556251f3a6 100644 --- a/pkg/cloud/aws/s3inventory/parquet_reader.go +++ b/pkg/cloud/aws/s3inventory/parquet_reader.go @@ -35,20 +35,29 @@ func (p *ParquetInventoryFileReader) Close() error { return p.PFile.Close() } -func (p *ParquetInventoryFileReader) getKeyColumnStatistics() *parquet.Statistics { - for i, c := range p.Footer.RowGroups[0].Columns { +func (p *ParquetInventoryFileReader) getKeyColumnStatistics(rowGroupIdx int) *parquet.Statistics { + for i, c := range p.Footer.RowGroups[rowGroupIdx].Columns { if c.MetaData.PathInSchema[len(c.GetMetaData().GetPathInSchema())-1] == "Key" { - return p.Footer.RowGroups[0].Columns[i].GetMetaData().GetStatistics() + return p.Footer.RowGroups[rowGroupIdx].Columns[i].GetMetaData().GetStatistics() } } - return p.Footer.RowGroups[0].Columns[1].GetMetaData().GetStatistics() + return p.Footer.RowGroups[rowGroupIdx].Columns[1].GetMetaData().GetStatistics() } + func (p *ParquetInventoryFileReader) FirstObjectKey() string { - return string(p.getKeyColumnStatistics().GetMin()) + statistics := p.getKeyColumnStatistics(0) + if len(statistics.GetMin()) > 0 { + return string(statistics.GetMin()) + } + return string(statistics.GetMinValue()) } func (p *ParquetInventoryFileReader) LastObjectKey() string { - return string(p.getKeyColumnStatistics().GetMax()) + statistics := p.getKeyColumnStatistics(len(p.Footer.RowGroups) - 1) + if len(statistics.GetMax()) > 0 { + return string(statistics.GetMax()) + } + return string(statistics.GetMinValue()) } func (p *ParquetInventoryFileReader) Read(n int) ([]*InventoryObject, error) { From 92f100aee36dae9a21384b1a39e53c267a2e8a8e Mon Sep 17 00:00:00 2001 From: johnnyaug Date: Thu, 20 May 2021 18:07:36 +0300 Subject: [PATCH 2/3] remove logging --- pkg/block/s3/inventory.go | 1 - pkg/block/s3/inventory_iterator.go | 1 - 2 files changed, 2 deletions(-) diff --git a/pkg/block/s3/inventory.go b/pkg/block/s3/inventory.go index 8f8feaadc2e..f56e87479b8 100644 --- a/pkg/block/s3/inventory.go +++ b/pkg/block/s3/inventory.go @@ -146,7 +146,6 @@ func sortManifest(m *Manifest, logger logging.Logger, reader s3inventory.IReader } m.Files[i].firstKey = mr.FirstObjectKey() m.Files[i].lastKey = mr.LastObjectKey() - logger.Debugf("file %s first %s last %s", m.Files[i].Key, m.Files[i].firstKey, m.Files[i].lastKey) err = mr.Close() if err != nil { logger.Errorf("failed to close inventory file. file=%s, err=%w", f, err) diff --git a/pkg/block/s3/inventory_iterator.go b/pkg/block/s3/inventory_iterator.go index 0e4df895530..694e1ada0ad 100644 --- a/pkg/block/s3/inventory_iterator.go +++ b/pkg/block/s3/inventory_iterator.go @@ -53,7 +53,6 @@ func (it *InventoryIterator) Next() bool { if val != nil { // validate element order if it.shouldSort && it.val != nil && val.Key < it.val.Key { - it.logger.Debugf("%s < %s", val.Key, it.val.Key) it.err = ErrInventoryNotSorted return false } From 3aea4637d84a5484864e350a91678ed252604404 Mon Sep 17 00:00:00 2001 From: johnnyaug Date: Sun, 23 May 2021 14:34:09 +0300 Subject: [PATCH 3/3] cr fixes --- pkg/cloud/aws/s3inventory/parquet_reader.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/cloud/aws/s3inventory/parquet_reader.go b/pkg/cloud/aws/s3inventory/parquet_reader.go index e556251f3a6..d56982a54d9 100644 --- a/pkg/cloud/aws/s3inventory/parquet_reader.go +++ b/pkg/cloud/aws/s3inventory/parquet_reader.go @@ -36,12 +36,14 @@ func (p *ParquetInventoryFileReader) Close() error { } func (p *ParquetInventoryFileReader) getKeyColumnStatistics(rowGroupIdx int) *parquet.Statistics { - for i, c := range p.Footer.RowGroups[rowGroupIdx].Columns { - if c.MetaData.PathInSchema[len(c.GetMetaData().GetPathInSchema())-1] == "Key" { - return p.Footer.RowGroups[rowGroupIdx].Columns[i].GetMetaData().GetStatistics() + columns := p.Footer.RowGroups[rowGroupIdx].Columns + for _, c := range columns { + metaData := c.GetMetaData() + if metaData.GetPathInSchema()[len(metaData.GetPathInSchema())-1] == "Key" { + return metaData.GetStatistics() } } - return p.Footer.RowGroups[rowGroupIdx].Columns[1].GetMetaData().GetStatistics() + return columns[1].GetMetaData().GetStatistics() } func (p *ParquetInventoryFileReader) FirstObjectKey() string {