Skip to content

Commit

Permalink
Merge branch 'main' into metrics-evaluator
Browse files Browse the repository at this point in the history
  • Loading branch information
zeroshade authored Aug 19, 2024
2 parents 66797d6 + ea71202 commit 7a111bd
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 15 deletions.
7 changes: 5 additions & 2 deletions io/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@

package io

import "os"
import (
"os"
"strings"
)

// LocalFS is an implementation of IO that implements interaction with
// the local file system.
type LocalFS struct{}

func (LocalFS) Open(name string) (File, error) {
return os.Open(name)
return os.Open(strings.TrimPrefix(name, "file://"))
}

func (LocalFS) Remove(name string) error {
Expand Down
59 changes: 46 additions & 13 deletions manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,28 @@ func (m *manifestFileV2) FetchEntries(fs iceio.IO, discardDeleted bool) ([]Manif
return fetchManifestEntries(m, fs, discardDeleted)
}

func getFieldIDMap(sc avro.Schema) map[string]int {
getField := func(rs *avro.RecordSchema, name string) *avro.Field {
for _, f := range rs.Fields() {
if f.Name() == name {
return f
}
}
return nil
}

result := make(map[string]int)
entryField := getField(sc.(*avro.RecordSchema), "data_file")
partitionField := getField(entryField.Type().(*avro.RecordSchema), "partition")

for _, field := range partitionField.Type().(*avro.RecordSchema).Fields() {
if fid, ok := field.Prop("field-id").(float64); ok {
result[field.Name()] = int(fid)
}
}
return result
}

func fetchManifestEntries(m ManifestFile, fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error) {
f, err := fs.Open(m.FilePath())
if err != nil {
Expand All @@ -376,15 +398,16 @@ func fetchManifestEntries(m ManifestFile, fs iceio.IO, discardDeleted bool) ([]M
}

metadata := dec.Metadata()
sc, err := avro.ParseBytes(dec.Metadata()["avro.schema"])
if err != nil {
return nil, err
}

fieldNameToID := getFieldIDMap(sc)
isVer1, isFallback := true, false
if string(metadata["format-version"]) == "2" {
isVer1 = false
} else {
sc, err := avro.ParseBytes(dec.Metadata()["avro.schema"])
if err != nil {
return nil, err
}

for _, f := range sc.(*avro.RecordSchema).Fields() {
if f.Name() == "snapshot_id" {
if f.Type().Type() == avro.Union {
Expand Down Expand Up @@ -418,6 +441,7 @@ func fetchManifestEntries(m ManifestFile, fs iceio.IO, discardDeleted bool) ([]M

if !discardDeleted || tmp.Status() != EntryStatusDELETED {
tmp.inheritSeqNum(m)
tmp.DataFile().setFieldNameToIDMap(fieldNameToID)
results = append(results, tmp)
}
}
Expand Down Expand Up @@ -598,25 +622,25 @@ func avroColMapToMap[K comparable, V any](c *[]colMap[K, V]) map[K]V {
}

func avroPartitionData(input map[string]any) map[string]any {
// hamba/avro2 will unmarshal a map[string]any such that
// each entry will actually be a map[string]interface{} with the key
// being the avro type.
// hambra/avro/v2 will unmarshal a map[string]any such that
// each entry will actually be a map[string]any with the key being
// the avro type, not the field name.
//
// This means that partition data that looks like:
// This means that partition data that looks like this:
//
// [{"field-id": 1000, "name": "ts", "type": {"type": "int", "logicalType": "date"}}]
//
// Becomes:
//
// map[string]any{"ts": map[string]any{"int.date": time.Time{}}}
//
// so we need to simplify our map and make partition data handling easier
out := map[string]any{}
// so we need to simplify our map and make the partition data handling easier
out := make(map[string]any)
for k, v := range input {
switch v := v.(type) {
case map[string]any:
for typname, val := range v {
switch typname {
for typeName, val := range v {
switch typeName {
case "int.date":
out[k] = Date(val.(time.Time).Truncate(24*time.Hour).Unix() / int64((time.Hour * 24).Seconds()))
case "int.time-millis":
Expand Down Expand Up @@ -670,6 +694,11 @@ type dataFile struct {
lowerBoundMap map[int][]byte
upperBoundMap map[int][]byte

// not used for anything yet, but important to maintain the information
// for future development and updates such as when we get to writes,
// and scan planning
fieldNameToID map[string]int

initMaps sync.Once
}

Expand All @@ -686,6 +715,8 @@ func (d *dataFile) initializeMapData() {
})
}

func (d *dataFile) setFieldNameToIDMap(m map[string]int) { d.fieldNameToID = m }

func (d *dataFile) ContentType() ManifestEntryContent { return d.Content }
func (d *dataFile) FilePath() string { return d.Path }
func (d *dataFile) FileFormat() FileFormat { return d.Format }
Expand Down Expand Up @@ -894,6 +925,8 @@ type DataFile interface {
// SortOrderID returns the id representing the sort order for this
// file, or nil if there is no sort order.
SortOrderID() *int

setFieldNameToIDMap(map[string]int)
}

// ManifestEntry is an interface for both v1 and v2 manifest entries.
Expand Down

0 comments on commit 7a111bd

Please sign in to comment.