diff --git a/go.mod b/go.mod index 5b05903..827297b 100644 --- a/go.mod +++ b/go.mod @@ -24,10 +24,11 @@ require ( github.com/aws/aws-sdk-go-v2/config v1.18.44 github.com/aws/aws-sdk-go-v2/credentials v1.13.42 github.com/aws/aws-sdk-go-v2/service/s3 v1.40.1 + github.com/google/uuid v1.3.1 github.com/hamba/avro/v2 v2.16.0 github.com/stretchr/testify v1.8.4 github.com/wolfeidau/s3iofs v1.3.0 - golang.org/x/exp v0.0.0-20230811145659-89c5cff77bcb + golang.org/x/exp v0.0.0-20231006140011-7918f672742d ) require ( diff --git a/go.sum b/go.sum index 4b892ec..6e1fc0f 100644 --- a/go.sum +++ b/go.sum @@ -42,6 +42,8 @@ github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= +github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hamba/avro/v2 v2.16.0 h1:0XhyP65Hs8iMLtdSR0v7ZrwRjsbIZdvr7KzYgmx1Mbo= github.com/hamba/avro/v2 v2.16.0/go.mod h1:Q9YK+qxAhtVrNqOhwlZTATLgLA8qxG2vtvkhK8fJ7Jo= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= @@ -68,8 +70,8 @@ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcU github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/wolfeidau/s3iofs v1.3.0 h1:O6lf4HCZVuHMjCdMYrpQIfKzLDYbO5BtNrH5RdQGv4c= github.com/wolfeidau/s3iofs v1.3.0/go.mod h1:oWH749wXNok7pJ+5MK7mi5uyS7GqQzeGApEGwUSaIjc= -golang.org/x/exp v0.0.0-20230811145659-89c5cff77bcb h1:mIKbk8weKhSeLH2GmUTrvx8CjkyJmnU1wFmg59CUjFA= -golang.org/x/exp v0.0.0-20230811145659-89c5cff77bcb/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= +golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI= +golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo= golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8= golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= diff --git a/table/metadata.go b/table/metadata.go new file mode 100644 index 0000000..957e163 --- /dev/null +++ b/table/metadata.go @@ -0,0 +1,401 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "encoding/json" + "errors" + "fmt" + "io" + + "github.com/apache/iceberg-go" + + "github.com/google/uuid" +) + +// Metadata for an iceberg table as specified in the Iceberg spec +// +// https://iceberg.apache.org/spec/#iceberg-table-spec +type Metadata interface { + // Version indicates the version of this metadata, 1 for V1, 2 for V2, etc. + Version() int + // TableUUID returns a UUID that identifies the table, generated when the + // table is created. Implementations must throw an exception if a table's + // UUID does not match the expected UUID after refreshing metadata. + TableUUID() uuid.UUID + // Location is the table's base location. This is used by writers to determine + // where to store data files, manifest files, and table metadata files. + Location() string + // LastUpdatedMillis is the timestamp in milliseconds from the unix epoch when + // the table was last updated. Each table metadata file should update this + // field just before writing. + LastUpdatedMillis() int64 + // LastColumnID returns the highest assigned column ID for the table. + // This is used to ensure fields are always assigned an unused ID when + // evolving schemas. + LastColumnID() int + // Schemas returns the list of schemas, stored as objects with their + // schema-id. + Schemas() []*iceberg.Schema + // CurrentSchema returns the table's current schema. + CurrentSchema() *iceberg.Schema + // PartitionSpecs returns the list of all partition specs in the table. + PartitionSpecs() []iceberg.PartitionSpec + // PartitionSpec returns the current partition spec that the table is using. + PartitionSpec() iceberg.PartitionSpec + // DefaultPartitionSpec is the ID of the current spec that writers should + // use by default. + DefaultPartitionSpec() int + // LastPartitionSpecID is the highest assigned partition field ID across + // all partition specs for the table. This is used to ensure partition + // fields are always assigned an unused ID when evolving specs. + LastPartitionSpecID() *int + // Snapshots returns the list of valid snapshots. Valid snapshots are + // snapshots for which all data files exist in the file system. A data + // file must not be deleted from the file system until the last snapshot + // in which it was listed is garbage collected. + Snapshots() []Snapshot + // SnapshotByID find and return a specific snapshot by its ID. Returns + // nil if the ID is not found in the list of snapshots. + SnapshotByID(int64) *Snapshot + // SnapshotByName searches the list of snapshots for a snapshot with a given + // ref name. Returns nil if there's no ref with this name for a snapshot. + SnapshotByName(name string) *Snapshot + // CurrentSnapshot returns the table's current snapshot. + CurrentSnapshot() *Snapshot + // SortOrder returns the table's current sort order, ie: the one with the + // ID that matches the default-sort-order-id. + SortOrder() SortOrder + // SortOrders returns the list of sort orders in the table. + SortOrders() []SortOrder + // Properties is a string to string map of table properties. This is used + // to control settings that affect reading and writing and is not intended + // to be used for arbitrary metadata. For example, commit.retry.num-retries + // is used to control the number of commit retries. + Properties() iceberg.Properties +} + +var ( + ErrInvalidMetadataFormatVersion = errors.New("invalid or missing format-version in table metadata") + ErrInvalidMetadata = errors.New("invalid metadata") +) + +// ParseMetadata parses json metadata provided by the passed in reader, +// returning an error if one is encountered. +func ParseMetadata(r io.Reader) (Metadata, error) { + data, err := io.ReadAll(r) + if err != nil { + return nil, err + } + + return ParseMetadataBytes(data) +} + +// ParseMetadataString is like [ParseMetadata], but for a string rather than +// an io.Reader. +func ParseMetadataString(s string) (Metadata, error) { + return ParseMetadataBytes([]byte(s)) +} + +// ParseMetadataBytes is like [ParseMetadataString] but for a byte slice. +func ParseMetadataBytes(b []byte) (Metadata, error) { + ver := struct { + FormatVersion int `json:"format-version"` + }{} + if err := json.Unmarshal(b, &ver); err != nil { + return nil, err + } + + var ret Metadata + switch ver.FormatVersion { + case 1: + ret = &MetadataV1{} + case 2: + ret = &MetadataV2{} + default: + return nil, ErrInvalidMetadataFormatVersion + } + + return ret, json.Unmarshal(b, ret) +} + +// https://iceberg.apache.org/spec/#iceberg-table-spec +type commonMetadata struct { + FormatVersion int `json:"format-version"` + UUID uuid.UUID `json:"table-uuid"` + Loc string `json:"location"` + LastUpdatedMS int64 `json:"last-updated-ms"` + LastColumnId int `json:"last-column-id"` + SchemaList []*iceberg.Schema `json:"schemas"` + CurrentSchemaID int `json:"current-schema-id"` + Specs []iceberg.PartitionSpec `json:"partition-specs"` + DefaultSpecID int `json:"default-spec-id"` + LastPartitionID *int `json:"last-partition-id,omitempty"` + Props iceberg.Properties `json:"properties"` + SnapshotList []Snapshot `json:"snapshots,omitempty"` + CurrentSnapshotID *int64 `json:"current-snapshot-id,omitempty"` + SnapshotLog []SnapshotLogEntry `json:"snapshot-log"` + MetadataLog []MetadataLogEntry `json:"metadata-log"` + SortOrderList []SortOrder `json:"sort-orders"` + DefaultSortOrderID int `json:"default-sort-order-id"` + Refs map[string]SnapshotRef `json:"refs"` +} + +func (c *commonMetadata) TableUUID() uuid.UUID { return c.UUID } +func (c *commonMetadata) Location() string { return c.Loc } +func (c *commonMetadata) LastUpdatedMillis() int64 { return c.LastUpdatedMS } +func (c *commonMetadata) LastColumnID() int { return c.LastColumnId } +func (c *commonMetadata) Schemas() []*iceberg.Schema { return c.SchemaList } +func (c *commonMetadata) CurrentSchema() *iceberg.Schema { + for _, s := range c.SchemaList { + if s.ID == c.CurrentSchemaID { + return s + } + } + panic("should never get here") +} + +func (c *commonMetadata) PartitionSpecs() []iceberg.PartitionSpec { + return c.Specs +} + +func (c *commonMetadata) DefaultPartitionSpec() int { + return c.DefaultSpecID +} + +func (c *commonMetadata) PartitionSpec() iceberg.PartitionSpec { + for _, s := range c.Specs { + if s.ID() == c.DefaultSpecID { + return s + } + } + return *iceberg.UnpartitionedSpec +} + +func (c *commonMetadata) LastPartitionSpecID() *int { return c.LastPartitionID } +func (c *commonMetadata) Snapshots() []Snapshot { return c.SnapshotList } +func (c *commonMetadata) SnapshotByID(id int64) *Snapshot { + for i := range c.SnapshotList { + if c.SnapshotList[i].SnapshotID == id { + return &c.SnapshotList[i] + } + } + return nil +} + +func (c *commonMetadata) SnapshotByName(name string) *Snapshot { + if ref, ok := c.Refs[name]; ok { + return c.SnapshotByID(ref.SnapshotID) + } + return nil +} + +func (c *commonMetadata) CurrentSnapshot() *Snapshot { + if c.CurrentSnapshotID == nil { + return nil + } + return c.SnapshotByID(*c.CurrentSnapshotID) +} + +func (c *commonMetadata) SortOrders() []SortOrder { return c.SortOrderList } +func (c *commonMetadata) SortOrder() SortOrder { + for _, s := range c.SortOrderList { + if s.OrderID == c.DefaultSortOrderID { + return s + } + } + return UnsortedSortOrder +} + +func (c *commonMetadata) Properties() iceberg.Properties { + return c.Props +} + +// preValidate updates values in the metadata struct with defaults based on +// combinations of struct members. Such as initializing slices as empty slices +// if they were null in the metadata, or normalizing inconsistencies between +// metadata versions. +func (c *commonMetadata) preValidate() { + if c.CurrentSnapshotID != nil && *c.CurrentSnapshotID == -1 { + // treat -1 as the same as nil, clean this up in pre-validation + // to make the validation logic simplified later + c.CurrentSnapshotID = nil + } + + if c.CurrentSnapshotID != nil { + if _, ok := c.Refs[MainBranch]; !ok { + c.Refs[MainBranch] = SnapshotRef{ + SnapshotID: *c.CurrentSnapshotID, + SnapshotRefType: BranchRef, + } + } + } + + if c.MetadataLog == nil { + c.MetadataLog = []MetadataLogEntry{} + } + + if c.Refs == nil { + c.Refs = make(map[string]SnapshotRef) + } + + if c.SnapshotLog == nil { + c.SnapshotLog = []SnapshotLogEntry{} + } +} + +func (c *commonMetadata) checkSchemas() error { + // check that current-schema-id is present in schemas + for _, s := range c.SchemaList { + if s.ID == c.CurrentSchemaID { + return nil + } + } + + return fmt.Errorf("%w: current-schema-id %d can't be found in any schema", + ErrInvalidMetadata, c.CurrentSchemaID) +} + +func (c *commonMetadata) checkPartitionSpecs() error { + for _, spec := range c.Specs { + if spec.ID() == c.DefaultSpecID { + return nil + } + } + + return fmt.Errorf("%w: default-spec-id %d can't be found", + ErrInvalidMetadata, c.DefaultSpecID) +} + +func (c *commonMetadata) checkSortOrders() error { + if c.DefaultSortOrderID == UnsortedSortOrderID { + return nil + } + + for _, o := range c.SortOrderList { + if o.OrderID == c.DefaultSortOrderID { + return nil + } + } + + return fmt.Errorf("%w: default-sort-order-id %d can't be found in %+v", + ErrInvalidMetadata, c.DefaultSortOrderID, c.SortOrderList) +} + +func (c *commonMetadata) validate() error { + if err := c.checkSchemas(); err != nil { + return err + } + + if err := c.checkPartitionSpecs(); err != nil { + return err + } + + if err := c.checkSortOrders(); err != nil { + return err + } + + switch { + case c.LastUpdatedMS == 0: + // last-updated-ms is required + return fmt.Errorf("%w: missing last-updated-ms", ErrInvalidMetadata) + case c.LastColumnId == 0: + // last-column-id is required + return fmt.Errorf("%w: missing last-column-id", ErrInvalidMetadata) + } + + return nil +} + +func (c *commonMetadata) Version() int { return c.FormatVersion } + +type MetadataV1 struct { + Schema iceberg.Schema `json:"schema"` + Partition []iceberg.PartitionField `json:"partition-spec"` + + commonMetadata +} + +func (m *MetadataV1) preValidate() { + if len(m.SchemaList) == 0 { + m.SchemaList = []*iceberg.Schema{&m.Schema} + } + + if len(m.Specs) == 0 { + m.Specs = []iceberg.PartitionSpec{ + iceberg.NewPartitionSpec(m.Partition...)} + m.DefaultSpecID = m.Specs[0].ID() + } + + if m.LastPartitionID == nil { + id := m.Specs[0].LastAssignedFieldID() + for _, spec := range m.Specs[1:] { + last := spec.LastAssignedFieldID() + if last > id { + id = last + } + } + m.LastPartitionID = &id + } + + if len(m.SortOrderList) == 0 { + m.SortOrderList = []SortOrder{UnsortedSortOrder} + } + + m.commonMetadata.preValidate() +} + +func (m *MetadataV1) UnmarshalJSON(b []byte) error { + type Alias MetadataV1 + aux := (*Alias)(m) + + if err := json.Unmarshal(b, aux); err != nil { + return err + } + + m.preValidate() + return m.validate() +} + +func (m *MetadataV1) ToV2() MetadataV2 { + commonOut := m.commonMetadata + commonOut.FormatVersion = 2 + if commonOut.UUID.String() == "" { + commonOut.UUID = uuid.New() + } + + return MetadataV2{commonMetadata: commonOut} +} + +type MetadataV2 struct { + LastSequenceNumber int `json:"last-sequence-number"` + + commonMetadata +} + +func (m *MetadataV2) UnmarshalJSON(b []byte) error { + type Alias MetadataV2 + aux := (*Alias)(m) + + if err := json.Unmarshal(b, aux); err != nil { + return err + } + + m.preValidate() + return m.validate() +} diff --git a/table/metadata_test.go b/table/metadata_test.go new file mode 100644 index 0000000..080688f --- /dev/null +++ b/table/metadata_test.go @@ -0,0 +1,491 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table_test + +import ( + "encoding/json" + "testing" + + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/table" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ExampleTableMetadataV2 = `{ + "format-version": 2, + "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", + "location": "s3://bucket/test/location", + "last-sequence-number": 34, + "last-updated-ms": 1602638573590, + "last-column-id": 3, + "current-schema-id": 1, + "schemas": [ + {"type": "struct", "schema-id": 0, "fields": [{"id": 1, "name": "x", "required": true, "type": "long"}]}, + { + "type": "struct", + "schema-id": 1, + "identifier-field-ids": [1, 2], + "fields": [ + {"id": 1, "name": "x", "required": true, "type": "long"}, + {"id": 2, "name": "y", "required": true, "type": "long", "doc": "comment"}, + {"id": 3, "name": "z", "required": true, "type": "long"} + ] + } + ], + "default-spec-id": 0, + "partition-specs": [{"spec-id": 0, "fields": [{"name": "x", "transform": "identity", "source-id": 1, "field-id": 1000}]}], + "last-partition-id": 1000, + "default-sort-order-id": 3, + "sort-orders": [ + { + "order-id": 3, + "fields": [ + {"transform": "identity", "source-id": 2, "direction": "asc", "null-order": "nulls-first"}, + {"transform": "bucket[4]", "source-id": 3, "direction": "desc", "null-order": "nulls-last"} + ] + } + ], + "properties": {"read.split.target.size": "134217728"}, + "current-snapshot-id": 3055729675574597004, + "snapshots": [ + { + "snapshot-id": 3051729675574597004, + "timestamp-ms": 1515100955770, + "sequence-number": 0, + "summary": {"operation": "append"}, + "manifest-list": "s3://a/b/1.avro" + }, + { + "snapshot-id": 3055729675574597004, + "parent-snapshot-id": 3051729675574597004, + "timestamp-ms": 1555100955770, + "sequence-number": 1, + "summary": {"operation": "append"}, + "manifest-list": "s3://a/b/2.avro", + "schema-id": 1 + } + ], + "snapshot-log": [ + {"snapshot-id": 3051729675574597004, "timestamp-ms": 1515100955770}, + {"snapshot-id": 3055729675574597004, "timestamp-ms": 1555100955770} + ], + "metadata-log": [{"metadata-file": "s3://bucket/.../v1.json", "timestamp-ms": 1515100}], + "refs": {"test": {"snapshot-id": 3051729675574597004, "type": "tag", "max-ref-age-ms": 10000000}} +}` + +const ExampleTableMetadataV1 = `{ + "format-version": 1, + "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c", + "location": "s3://bucket/test/location", + "last-updated-ms": 1602638573874, + "last-column-id": 3, + "schema": { + "type": "struct", + "fields": [ + {"id": 1, "name": "x", "required": true, "type": "long"}, + {"id": 2, "name": "y", "required": true, "type": "long", "doc": "comment"}, + {"id": 3, "name": "z", "required": true, "type": "long"} + ] + }, + "partition-spec": [{"name": "x", "transform": "identity", "source-id": 1, "field-id": 1000}], + "properties": {}, + "current-snapshot-id": -1, + "snapshots": [{"snapshot-id": 1925, "timestamp-ms": 1602638573822}] +}` + +func TestMetadataV1Parsing(t *testing.T) { + meta, err := table.ParseMetadataBytes([]byte(ExampleTableMetadataV1)) + require.NoError(t, err) + require.NotNil(t, meta) + + assert.IsType(t, (*table.MetadataV1)(nil), meta) + assert.Equal(t, 1, meta.Version()) + + data := meta.(*table.MetadataV1) + assert.Equal(t, uuid.MustParse("d20125c8-7284-442c-9aea-15fee620737c"), meta.TableUUID()) + assert.Equal(t, "s3://bucket/test/location", meta.Location()) + assert.Equal(t, int64(1602638573874), meta.LastUpdatedMillis()) + assert.Equal(t, 3, meta.LastColumnID()) + + expected := iceberg.NewSchema( + 0, + iceberg.NestedField{ID: 1, Name: "x", Type: iceberg.PrimitiveTypes.Int64, Required: true}, + iceberg.NestedField{ID: 2, Name: "y", Type: iceberg.PrimitiveTypes.Int64, Required: true, Doc: "comment"}, + iceberg.NestedField{ID: 3, Name: "z", Type: iceberg.PrimitiveTypes.Int64, Required: true}, + ) + + assert.Equal(t, []*iceberg.Schema{expected}, meta.Schemas()) + assert.Zero(t, data.SchemaList[0].ID) + assert.True(t, meta.CurrentSchema().Equals(expected)) + assert.Equal(t, []iceberg.PartitionSpec{ + iceberg.NewPartitionSpec(iceberg.PartitionField{ + SourceID: 1, FieldID: 1000, Transform: iceberg.IdentityTransform{}, Name: "x", + }), + }, meta.PartitionSpecs()) + + assert.Equal(t, iceberg.NewPartitionSpec(iceberg.PartitionField{ + SourceID: 1, FieldID: 1000, Transform: iceberg.IdentityTransform{}, Name: "x", + }), meta.PartitionSpec()) + + assert.Equal(t, 0, meta.DefaultPartitionSpec()) + assert.Equal(t, 1000, *meta.LastPartitionSpecID()) + assert.Nil(t, data.CurrentSnapshotID) + assert.Nil(t, meta.CurrentSnapshot()) + assert.Len(t, meta.Snapshots(), 1) + assert.NotNil(t, meta.SnapshotByID(1925)) + assert.Nil(t, meta.SnapshotByID(0)) + assert.Nil(t, meta.SnapshotByName("foo")) + assert.Zero(t, data.DefaultSortOrderID) + assert.Equal(t, table.UnsortedSortOrder, meta.SortOrder()) +} + +func TestMetadataV2Parsing(t *testing.T) { + meta, err := table.ParseMetadataBytes([]byte(ExampleTableMetadataV2)) + require.NoError(t, err) + require.NotNil(t, meta) + + assert.IsType(t, (*table.MetadataV2)(nil), meta) + assert.Equal(t, 2, meta.Version()) + + data := meta.(*table.MetadataV2) + assert.Equal(t, uuid.MustParse("9c12d441-03fe-4693-9a96-a0705ddf69c1"), data.UUID) + assert.Equal(t, "s3://bucket/test/location", data.Location()) + assert.Equal(t, 34, data.LastSequenceNumber) + assert.Equal(t, int64(1602638573590), data.LastUpdatedMS) + assert.Equal(t, 3, data.LastColumnId) + assert.Equal(t, 0, data.SchemaList[0].ID) + assert.Equal(t, 1, data.CurrentSchemaID) + assert.Equal(t, 0, data.Specs[0].ID()) + assert.Equal(t, 0, data.DefaultSpecID) + assert.Equal(t, 1000, *data.LastPartitionID) + assert.EqualValues(t, "134217728", data.Props["read.split.target.size"]) + assert.EqualValues(t, 3055729675574597004, *data.CurrentSnapshotID) + assert.EqualValues(t, 3051729675574597004, data.SnapshotList[0].SnapshotID) + assert.Equal(t, int64(1515100955770), data.SnapshotLog[0].TimestampMs) + assert.Equal(t, 3, data.SortOrderList[0].OrderID) + assert.Equal(t, 3, data.DefaultSortOrderID) + + assert.Len(t, meta.Snapshots(), 2) + assert.Equal(t, data.SnapshotList[1], *meta.CurrentSnapshot()) + assert.Equal(t, data.SnapshotList[0], *meta.SnapshotByName("test")) + assert.EqualValues(t, "134217728", meta.Properties()["read.split.target.size"]) +} + +func TestParsingCorrectTypes(t *testing.T) { + var meta table.MetadataV2 + require.NoError(t, json.Unmarshal([]byte(ExampleTableMetadataV2), &meta)) + + assert.IsType(t, &iceberg.Schema{}, meta.SchemaList[0]) + assert.IsType(t, iceberg.NestedField{}, meta.SchemaList[0].Field(0)) + assert.IsType(t, iceberg.PrimitiveTypes.Int64, meta.SchemaList[0].Field(0).Type) +} + +func TestSerializeMetadataV1(t *testing.T) { + var meta table.MetadataV1 + require.NoError(t, json.Unmarshal([]byte(ExampleTableMetadataV1), &meta)) + + data, err := json.Marshal(&meta) + require.NoError(t, err) + + assert.JSONEq(t, `{"location": "s3://bucket/test/location", "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c", "last-updated-ms": 1602638573874, "last-column-id": 3, "schemas": [{"type": "struct", "fields": [{"id": 1, "name": "x", "type": "long", "required": true}, {"id": 2, "name": "y", "type": "long", "required": true, "doc": "comment"}, {"id": 3, "name": "z", "type": "long", "required": true}], "schema-id": 0, "identifier-field-ids": []}], "current-schema-id": 0, "partition-specs": [{"spec-id": 0, "fields": [{"source-id": 1, "field-id": 1000, "transform": "identity", "name": "x"}]}], "default-spec-id": 0, "last-partition-id": 1000, "properties": {}, "snapshots": [{"snapshot-id": 1925, "sequence-number": 0, "timestamp-ms": 1602638573822}], "snapshot-log": [], "metadata-log": [], "sort-orders": [{"order-id": 0, "fields": []}], "default-sort-order-id": 0, "refs": {}, "format-version": 1, "schema": {"type": "struct", "fields": [{"id": 1, "name": "x", "type": "long", "required": true}, {"id": 2, "name": "y", "type": "long", "required": true, "doc": "comment"}, {"id": 3, "name": "z", "type": "long", "required": true}], "schema-id": 0, "identifier-field-ids": []}, "partition-spec": [{"name": "x", "transform": "identity", "source-id": 1, "field-id": 1000}]}`, + string(data)) +} + +func TestSerializeMetadataV2(t *testing.T) { + var meta table.MetadataV2 + require.NoError(t, json.Unmarshal([]byte(ExampleTableMetadataV2), &meta)) + + data, err := json.Marshal(&meta) + require.NoError(t, err) + + assert.JSONEq(t, `{"location": "s3://bucket/test/location", "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", "last-updated-ms": 1602638573590, "last-column-id": 3, "schemas": [{"type": "struct", "fields": [{"id": 1, "name": "x", "type": "long", "required": true}], "schema-id": 0, "identifier-field-ids": []}, {"type": "struct", "fields": [{"id": 1, "name": "x", "type": "long", "required": true}, {"id": 2, "name": "y", "type": "long", "required": true, "doc": "comment"}, {"id": 3, "name": "z", "type": "long", "required": true}], "schema-id": 1, "identifier-field-ids": [1, 2]}], "current-schema-id": 1, "partition-specs": [{"spec-id": 0, "fields": [{"source-id": 1, "field-id": 1000, "transform": "identity", "name": "x"}]}], "default-spec-id": 0, "last-partition-id": 1000, "properties": {"read.split.target.size": "134217728"}, "current-snapshot-id": 3055729675574597004, "snapshots": [{"snapshot-id": 3051729675574597004, "sequence-number": 0, "timestamp-ms": 1515100955770, "manifest-list": "s3://a/b/1.avro", "summary": {"operation": "append"}}, {"snapshot-id": 3055729675574597004, "parent-snapshot-id": 3051729675574597004, "sequence-number": 1, "timestamp-ms": 1555100955770, "manifest-list": "s3://a/b/2.avro", "summary": {"operation": "append"}, "schema-id": 1}], "snapshot-log": [{"snapshot-id": 3051729675574597004, "timestamp-ms": 1515100955770}, {"snapshot-id": 3055729675574597004, "timestamp-ms": 1555100955770}], "metadata-log": [{"metadata-file": "s3://bucket/.../v1.json", "timestamp-ms": 1515100}], "sort-orders": [{"order-id": 3, "fields": [{"source-id": 2, "transform": "identity", "direction": "asc", "null-order": "nulls-first"}, {"source-id": 3, "transform": "bucket[4]", "direction": "desc", "null-order": "nulls-last"}]}], "default-sort-order-id": 3, "refs": {"test": {"snapshot-id": 3051729675574597004, "type": "tag", "max-ref-age-ms": 10000000}, "main": {"snapshot-id": 3055729675574597004, "type": "branch"}}, "format-version": 2, "last-sequence-number": 34}`, + string(data)) +} + +func TestInvalidFormatVersion(t *testing.T) { + metadataInvalidFormat := `{ + "format-version": -1, + "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c", + "location": "s3://bucket/test/location", + "last-updated-ms": 1602638573874, + "last-column-id": 3, + "schema": { + "type": "struct", + "fields": [ + {"id": 1, "name": "x", "required": true, "type": "long"}, + {"id": 2, "name": "y", "required": true, "type": "long", "doc": "comment"}, + {"id": 3, "name": "z", "required": true, "type": "long"} + ] + }, + "partition-spec": [{"name": "x", "transform": "identity", "source-id": 1, "field-id": 1000}], + "properties": {}, + "current-snapshot-id": -1, + "snapshots": [] + }` + + _, err := table.ParseMetadataBytes([]byte(metadataInvalidFormat)) + assert.Error(t, err) + assert.ErrorIs(t, err, table.ErrInvalidMetadataFormatVersion) +} + +func TestCurrentSchemaNotFound(t *testing.T) { + schemaNotFound := `{ + "format-version": 2, + "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c", + "location": "s3://bucket/test/location", + "last-updated-ms": 1602638573874, + "last-column-id": 3, + "schemas": [ + {"type": "struct", "schema-id": 0, "fields": [{"id": 1, "name": "x", "required": true, "type": "long"}]}, + { + "type": "struct", + "schema-id": 1, + "identifier-field-ids": [1, 2], + "fields": [ + {"id": 1, "name": "x", "required": true, "type": "long"}, + {"id": 2, "name": "y", "required": true, "type": "long", "doc": "comment"}, + {"id": 3, "name": "z", "required": true, "type": "long"} + ] + } + ], + "current-schema-id": 2, + "default-spec-id": 0, + "partition-specs": [{"spec-id": 0, "fields": [{"name": "x", "transform": "identity", "source-id": 1, "field-id": 1000}]}], + "last-partition-id": 1000, + "default-sort-order-id": 0, + "properties": {}, + "current-snapshot-id": -1, + "snapshots": [] + }` + + _, err := table.ParseMetadataBytes([]byte(schemaNotFound)) + assert.Error(t, err) + assert.ErrorIs(t, err, table.ErrInvalidMetadata) + assert.ErrorContains(t, err, "current-schema-id 2 can't be found in any schema") +} + +func TestSortOrderNotFound(t *testing.T) { + metadataSortOrderNotFound := `{ + "format-version": 2, + "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c", + "location": "s3://bucket/test/location", + "last-updated-ms": 1602638573874, + "last-column-id": 3, + "schemas": [ + { + "type": "struct", + "schema-id": 0, + "identifier-field-ids": [1, 2], + "fields": [ + {"id": 1, "name": "x", "required": true, "type": "long"}, + {"id": 2, "name": "y", "required": true, "type": "long", "doc": "comment"}, + {"id": 3, "name": "z", "required": true, "type": "long"} + ] + } + ], + "default-sort-order-id": 4, + "sort-orders": [ + { + "order-id": 3, + "fields": [ + {"transform": "identity", "source-id": 2, "direction": "asc", "null-order": "nulls-first"}, + {"transform": "bucket[4]", "source-id": 3, "direction": "desc", "null-order": "nulls-last"} + ] + } + ], + "current-schema-id": 0, + "default-spec-id": 0, + "partition-specs": [{"spec-id": 0, "fields": [{"name": "x", "transform": "identity", "source-id": 1, "field-id": 1000}]}], + "last-partition-id": 1000, + "properties": {}, + "current-snapshot-id": -1, + "snapshots": [] + }` + + _, err := table.ParseMetadataBytes([]byte(metadataSortOrderNotFound)) + assert.Error(t, err) + assert.ErrorIs(t, err, table.ErrInvalidMetadata) + assert.ErrorContains(t, err, "default-sort-order-id 4 can't be found in [3: [\n2 asc nulls-first\nbucket[4](3) desc nulls-last\n]]") +} + +func TestSortOrderUnsorted(t *testing.T) { + sortOrderUnsorted := `{ + "format-version": 2, + "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c", + "location": "s3://bucket/test/location", + "last-updated-ms": 1602638573874, + "last-column-id": 3, + "schemas": [ + { + "type": "struct", + "schema-id": 0, + "identifier-field-ids": [1, 2], + "fields": [ + {"id": 1, "name": "x", "required": true, "type": "long"}, + {"id": 2, "name": "y", "required": true, "type": "long", "doc": "comment"}, + {"id": 3, "name": "z", "required": true, "type": "long"} + ] + } + ], + "default-sort-order-id": 0, + "sort-orders": [], + "current-schema-id": 0, + "default-spec-id": 0, + "partition-specs": [{"spec-id": 0, "fields": [{"name": "x", "transform": "identity", "source-id": 1, "field-id": 1000}]}], + "last-partition-id": 1000, + "properties": {}, + "current-snapshot-id": -1, + "snapshots": [] + }` + + var meta table.MetadataV2 + require.NoError(t, json.Unmarshal([]byte(sortOrderUnsorted), &meta)) + + assert.Equal(t, table.UnsortedSortOrderID, meta.DefaultSortOrderID) + assert.Len(t, meta.SortOrderList, 0) +} + +func TestInvalidPartitionSpecID(t *testing.T) { + invalidSpecID := `{ + "format-version": 2, + "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", + "location": "s3://bucket/test/location", + "last-sequence-number": 34, + "last-updated-ms": 1602638573590, + "last-column-id": 3, + "current-schema-id": 1, + "schemas": [ + {"type": "struct", "schema-id": 0, "fields": [{"id": 1, "name": "x", "required": true, "type": "long"}]}, + { + "type": "struct", + "schema-id": 1, + "identifier-field-ids": [1, 2], + "fields": [ + {"id": 1, "name": "x", "required": true, "type": "long"}, + {"id": 2, "name": "y", "required": true, "type": "long", "doc": "comment"}, + {"id": 3, "name": "z", "required": true, "type": "long"} + ] + } + ], + "sort-orders": [], + "default-sort-order-id": 0, + "default-spec-id": 1, + "partition-specs": [{"spec-id": 0, "fields": [{"name": "x", "transform": "identity", "source-id": 1, "field-id": 1000}]}], + "last-partition-id": 1000 + }` + + var meta table.MetadataV2 + err := json.Unmarshal([]byte(invalidSpecID), &meta) + assert.ErrorIs(t, err, table.ErrInvalidMetadata) + assert.ErrorContains(t, err, "default-spec-id 1 can't be found") +} + +func TestV2RefCreation(t *testing.T) { + var meta table.MetadataV2 + require.NoError(t, json.Unmarshal([]byte(ExampleTableMetadataV2), &meta)) + + maxRefAge := int64(10000000) + assert.Equal(t, map[string]table.SnapshotRef{ + "main": { + SnapshotID: 3055729675574597004, + SnapshotRefType: table.BranchRef, + }, + "test": { + SnapshotID: 3051729675574597004, + SnapshotRefType: table.TagRef, + MaxRefAgeMs: &maxRefAge, + }, + }, meta.Refs) +} + +func TestV1WriteMetadataToV2(t *testing.T) { + // https://iceberg.apache.org/spec/#version-2 + // + // Table metadata JSON: + // - last-sequence-number was added and is required; default to 0 when reading v1 metadata + // - table-uuid is now required + // - current-schema-id is now required + // - schemas is now required + // - partition-specs is now required + // - default-spec-id is now required + // - last-partition-id is now required + // - sort-orders is now required + // - default-sort-order-id is now required + // - schema is no longer required and should be omitted; use schemas and current-schema-id instead + // - partition-spec is no longer required and should be omitted; use partition-specs and default-spec-id instead + + minimalV1Example := `{ + "format-version": 1, + "location": "s3://bucket/test/location", + "last-updated-ms": 1062638573874, + "last-column-id": 3, + "schema": { + "type": "struct", + "fields": [ + {"id": 1, "name": "x", "required": true, "type": "long"}, + {"id": 2, "name": "y", "required": true, "type": "long", "doc": "comment"}, + {"id": 3, "name": "z", "required": true, "type": "long"} + ] + }, + "partition-spec": [{"name": "x", "transform": "identity", "source-id": 1, "field-id": 1000}], + "properties": {}, + "current-snapshot-id": -1, + "snapshots": [{"snapshot-id": 1925, "timestamp-ms": 1602638573822}] + }` + + meta, err := table.ParseMetadataString(minimalV1Example) + require.NoError(t, err) + assert.IsType(t, (*table.MetadataV1)(nil), meta) + + metaV2 := meta.(*table.MetadataV1).ToV2() + metaV2Json, err := json.Marshal(metaV2) + require.NoError(t, err) + + rawData := make(map[string]any) + require.NoError(t, json.Unmarshal(metaV2Json, &rawData)) + + assert.EqualValues(t, 0, rawData["last-sequence-number"]) + assert.NotEmpty(t, rawData["table-uuid"]) + assert.EqualValues(t, 0, rawData["current-schema-id"]) + assert.Equal(t, []any{map[string]any{ + "fields": []any{ + map[string]any{"id": float64(1), "name": "x", "required": true, "type": "long"}, + map[string]any{"id": float64(2), "name": "y", "required": true, "type": "long", "doc": "comment"}, + map[string]any{"id": float64(3), "name": "z", "required": true, "type": "long"}, + }, + "identifier-field-ids": []any{}, + "schema-id": float64(0), + "type": "struct", + }}, rawData["schemas"]) + assert.Equal(t, []any{map[string]any{ + "spec-id": float64(0), + "fields": []any{map[string]any{ + "name": "x", "transform": "identity", + "source-id": float64(1), "field-id": float64(1000), + }}, + }}, rawData["partition-specs"]) + + assert.Zero(t, rawData["default-spec-id"]) + assert.EqualValues(t, 1000, rawData["last-partition-id"]) + assert.Zero(t, rawData["default-sort-order-id"]) + assert.Equal(t, []any{map[string]any{"order-id": float64(0), "fields": []any{}}}, rawData["sort-orders"]) + assert.NotContains(t, rawData, "schema") + assert.NotContains(t, rawData, "partition-spec") +} diff --git a/table/refs.go b/table/refs.go new file mode 100644 index 0000000..cf63efc --- /dev/null +++ b/table/refs.go @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "encoding/json" + "errors" +) + +const MainBranch = "main" + +// RefType will be either a BranchRef or a TagRef +type RefType string + +const ( + BranchRef RefType = "branch" + TagRef RefType = "tag" +) + +var ( + ErrInvalidRefType = errors.New("invalid snapshot ref type, should be 'branch' or 'tag'") +) + +// SnapshotRef represents the reference information for a specific snapshot +type SnapshotRef struct { + SnapshotID int64 `json:"snapshot-id"` + SnapshotRefType RefType `json:"type"` + MinSnapshotsToKeep *int `json:"min-snapshots-to-keep,omitempty"` + MaxSnapshotAgeMs *int64 `json:"max-snapshot-age-ms,omitempty"` + MaxRefAgeMs *int64 `json:"max-ref-age-ms,omitempty"` +} + +func (s *SnapshotRef) UnmarshalJSON(b []byte) error { + type Alias SnapshotRef + aux := (*Alias)(s) + + if err := json.Unmarshal(b, aux); err != nil { + return nil + } + + switch s.SnapshotRefType { + case BranchRef, TagRef: + default: + return ErrInvalidRefType + } + + return nil +} diff --git a/table/refs_test.go b/table/refs_test.go new file mode 100644 index 0000000..d8b54e4 --- /dev/null +++ b/table/refs_test.go @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table_test + +import ( + "encoding/json" + "testing" + + "github.com/apache/iceberg-go/table" + "github.com/stretchr/testify/assert" +) + +func TestInvalidSnapshotRefType(t *testing.T) { + ref := `{ + "snapshot-id": 3051729675574597004, + "type": "foobar" + }` + + var snapRef table.SnapshotRef + err := json.Unmarshal([]byte(ref), &snapRef) + assert.ErrorIs(t, err, table.ErrInvalidRefType) +} + +func TestSnapshotBranchRef(t *testing.T) { + ref := `{ + "snapshot-id": 3051729675574597004, + "type": "branch" + }` + + var snapRef table.SnapshotRef + err := json.Unmarshal([]byte(ref), &snapRef) + assert.NoError(t, err) + + assert.Equal(t, table.BranchRef, snapRef.SnapshotRefType) + assert.Equal(t, int64(3051729675574597004), snapRef.SnapshotID) + assert.Nil(t, snapRef.MinSnapshotsToKeep) + assert.Nil(t, snapRef.MaxRefAgeMs) + assert.Nil(t, snapRef.MaxSnapshotAgeMs) +} + +func TestSnapshotTagRef(t *testing.T) { + ref := `{ + "snapshot-id": 3051729675574597004, + "type": "tag", + "min-snapshots-to-keep": 10 + }` + + var snapRef table.SnapshotRef + err := json.Unmarshal([]byte(ref), &snapRef) + assert.NoError(t, err) + + assert.Equal(t, table.TagRef, snapRef.SnapshotRefType) + assert.Equal(t, int64(3051729675574597004), snapRef.SnapshotID) + assert.Equal(t, 10, *snapRef.MinSnapshotsToKeep) + assert.Nil(t, snapRef.MaxRefAgeMs) + assert.Nil(t, snapRef.MaxSnapshotAgeMs) +} diff --git a/table/snapshots.go b/table/snapshots.go new file mode 100644 index 0000000..26dc8d2 --- /dev/null +++ b/table/snapshots.go @@ -0,0 +1,196 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "encoding/json" + "errors" + "fmt" + "strconv" + + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/io" + "golang.org/x/exp/maps" +) + +type Operation string + +const ( + OpAppend Operation = "append" + OpReplace Operation = "replace" + OpOverwrite Operation = "overwrite" + OpDelete Operation = "delete" +) + +var ( + ErrInvalidOperation = errors.New("invalid operation value") + ErrMissingOperation = errors.New("missing operation key") +) + +// ValidOperation ensures that a given string is one of the valid operation +// types: append,replace,overwrite,delete +func ValidOperation(s string) (Operation, error) { + switch s { + case "append", "replace", "overwrite", "delete": + return Operation(s), nil + } + return "", fmt.Errorf("%w: found '%s'", ErrInvalidOperation, s) +} + +const operationKey = "operation" + +// Summary stores the summary information for a snapshot indicating +// the operation that created the snapshot, and various properties +// which might exist in the summary. +type Summary struct { + Operation Operation + Properties map[string]string +} + +func (s *Summary) String() string { + out := string(s.Operation) + if s.Properties != nil { + data, _ := json.Marshal(s.Properties) + out += ", " + string(data) + } + return out +} + +func (s *Summary) Equals(other *Summary) bool { + if s == other { + return true + } + + if s != nil && other == nil { + return false + } + + if s.Operation != other.Operation { + return false + } + + if len(s.Properties) == 0 && len(other.Properties) == 0 { + return true + } + + return maps.Equal(s.Properties, other.Properties) +} + +func (s *Summary) UnmarshalJSON(b []byte) (err error) { + alias := map[string]string{} + if err = json.Unmarshal(b, &alias); err != nil { + return + } + + op, ok := alias[operationKey] + if !ok { + return ErrMissingOperation + } + + if s.Operation, err = ValidOperation(op); err != nil { + return + } + + delete(alias, operationKey) + s.Properties = alias + return nil +} + +func (s *Summary) MarshalJSON() ([]byte, error) { + props := maps.Clone(s.Properties) + if s.Operation != "" { + if props == nil { + props = make(map[string]string) + } + props[operationKey] = string(s.Operation) + } + + return json.Marshal(props) +} + +type Snapshot struct { + SnapshotID int64 `json:"snapshot-id"` + ParentSnapshotID *int64 `json:"parent-snapshot-id,omitempty"` + SequenceNumber int64 `json:"sequence-number"` + TimestampMs int64 `json:"timestamp-ms"` + ManifestList string `json:"manifest-list,omitempty"` + Summary *Summary `json:"summary,omitempty"` + SchemaID *int `json:"schema-id,omitempty"` +} + +func (s Snapshot) String() string { + var ( + op, parent, schema string + ) + + if s.Summary != nil { + op = s.Summary.String() + ": " + } + if s.ParentSnapshotID != nil { + parent = ", parent_id=" + strconv.FormatInt(*s.ParentSnapshotID, 10) + } + if s.SchemaID != nil { + schema = ", schema_id=" + strconv.Itoa(*s.SchemaID) + } + return fmt.Sprintf("%sid=%d%s%s, sequence_number=%d, timestamp_ms=%d, manifest_list=%s", + op, s.SnapshotID, parent, schema, s.SequenceNumber, s.TimestampMs, s.ManifestList) +} + +func (s Snapshot) Equals(other Snapshot) bool { + switch { + case s.ParentSnapshotID == nil && other.ParentSnapshotID != nil: + fallthrough + case s.ParentSnapshotID != nil && other.ParentSnapshotID == nil: + fallthrough + case s.SchemaID == nil && other.SchemaID != nil: + fallthrough + case s.SchemaID != nil && other.SchemaID == nil: + return false + } + + return s.SnapshotID == other.SnapshotID && + ((s.ParentSnapshotID == other.ParentSnapshotID) || (*s.ParentSnapshotID == *other.ParentSnapshotID)) && + ((s.SchemaID == other.SchemaID) || (*s.SchemaID == *other.SchemaID)) && + s.SequenceNumber == other.SequenceNumber && + s.TimestampMs == other.TimestampMs && + s.ManifestList == other.ManifestList && + s.Summary.Equals(other.Summary) +} + +func (s Snapshot) Manifests(fio io.IO) ([]iceberg.ManifestFile, error) { + if s.ManifestList != "" { + f, err := fio.Open(s.ManifestList) + if err != nil { + return nil, fmt.Errorf("could not open manifest file: %w", err) + } + defer f.Close() + return iceberg.ReadManifestList(f) + } + + return nil, nil +} + +type MetadataLogEntry struct { + MetadataFile string `json:"metadata-file"` + TimestampMs int64 `json:"timestamp-ms"` +} + +type SnapshotLogEntry struct { + SnapshotID int64 `json:"snapshot-id"` + TimestampMs int64 `json:"timestamp-ms"` +} diff --git a/table/snapshots_test.go b/table/snapshots_test.go new file mode 100644 index 0000000..6a101a5 --- /dev/null +++ b/table/snapshots_test.go @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table_test + +import ( + "encoding/json" + "testing" + + "github.com/apache/iceberg-go/table" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Snapshot() table.Snapshot { + parentID := int64(19) + manifest, schemaid := "s3:/a/b/c.avro", 3 + return table.Snapshot{ + SnapshotID: 25, + ParentSnapshotID: &parentID, + SequenceNumber: 200, + TimestampMs: 1602638573590, + ManifestList: manifest, + SchemaID: &schemaid, + Summary: &table.Summary{ + Operation: table.OpAppend, + }, + } +} + +func SnapshotWithProperties() table.Snapshot { + parentID := int64(19) + manifest, schemaid := "s3:/a/b/c.avro", 3 + return table.Snapshot{ + SnapshotID: 25, + ParentSnapshotID: &parentID, + SequenceNumber: 200, + TimestampMs: 1602638573590, + ManifestList: manifest, + SchemaID: &schemaid, + Summary: &table.Summary{ + Operation: table.OpAppend, + Properties: map[string]string{"foo": "bar"}, + }, + } +} + +func TestSerializeSnapshot(t *testing.T) { + snapshot := Snapshot() + data, err := json.Marshal(snapshot) + require.NoError(t, err) + + assert.JSONEq(t, `{ + "snapshot-id": 25, + "parent-snapshot-id": 19, + "sequence-number": 200, + "timestamp-ms": 1602638573590, + "manifest-list": "s3:/a/b/c.avro", + "summary": {"operation": "append"}, + "schema-id": 3 + }`, string(data)) +} + +func TestSerializeSnapshotWithProps(t *testing.T) { + snapshot := SnapshotWithProperties() + data, err := json.Marshal(snapshot) + require.NoError(t, err) + + assert.JSONEq(t, `{ + "snapshot-id": 25, + "parent-snapshot-id": 19, + "sequence-number": 200, + "timestamp-ms": 1602638573590, + "manifest-list": "s3:/a/b/c.avro", + "summary": {"operation": "append", "foo": "bar"}, + "schema-id": 3 + }`, string(data)) +} + +func TestMissingOperation(t *testing.T) { + var summary table.Summary + err := json.Unmarshal([]byte(`{"foo": "bar"}`), &summary) + assert.ErrorIs(t, err, table.ErrMissingOperation) +} + +func TestInvalidOperation(t *testing.T) { + var summary table.Summary + err := json.Unmarshal([]byte(`{"operation": "foobar"}`), &summary) + assert.ErrorIs(t, err, table.ErrInvalidOperation) + assert.ErrorContains(t, err, "found 'foobar'") +} + +func TestSnapshotString(t *testing.T) { + snapshot := Snapshot() + assert.Equal(t, `append: id=25, parent_id=19, schema_id=3, sequence_number=200, timestamp_ms=1602638573590, manifest_list=s3:/a/b/c.avro`, + snapshot.String()) + + snapshot = SnapshotWithProperties() + assert.Equal(t, `append, {"foo":"bar"}: id=25, parent_id=19, schema_id=3, sequence_number=200, timestamp_ms=1602638573590, manifest_list=s3:/a/b/c.avro`, + snapshot.String()) +} diff --git a/table/sorting.go b/table/sorting.go new file mode 100644 index 0000000..89bc76c --- /dev/null +++ b/table/sorting.go @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "encoding/json" + "errors" + "fmt" + "strings" + + "github.com/apache/iceberg-go" +) + +type SortDirection string + +const ( + SortASC SortDirection = "asc" + SortDESC SortDirection = "desc" +) + +type NullOrder string + +const ( + NullsFirst NullOrder = "nulls-first" + NullsLast NullOrder = "nulls-last" +) + +var ( + ErrInvalidSortDirection = errors.New("invalid sort direction, must be 'asc' or 'desc'") + ErrInvalidNullOrder = errors.New("invalid null order, must be 'nulls-first' or 'nulls-last'") +) + +// SortField describes a field used in a sort order definition. +type SortField struct { + // SourceID is the source column id from the table's schema + SourceID int `json:"source-id"` + // Transform is the tranformation used to produce values to be + // sorted on from the source column. + Transform iceberg.Transform `json:"transform"` + // Direction is an enum indicating ascending or descending direction. + Direction SortDirection `json:"direction"` + // NullOrder describes the order of null values when sorting + // should be only either nulls-first or nulls-last enum values. + NullOrder NullOrder `json:"null-order"` +} + +func (s *SortField) String() string { + if _, ok := s.Transform.(iceberg.IdentityTransform); ok { + return fmt.Sprintf("%d %s %s", s.SourceID, s.Direction, s.NullOrder) + } + return fmt.Sprintf("%s(%d) %s %s", s.Transform, s.SourceID, s.Direction, s.NullOrder) +} + +func (s *SortField) MarshalJSON() ([]byte, error) { + if s.Direction == "" { + s.Direction = SortASC + } + + if s.NullOrder == "" { + if s.Direction == SortASC { + s.NullOrder = NullsFirst + } else { + s.NullOrder = NullsLast + } + } + + type Alias SortField + return json.Marshal((*Alias)(s)) +} + +func (s *SortField) UnmarshalJSON(b []byte) error { + type Alias SortField + var aux = struct { + TransformString string `json:"transform"` + *Alias + }{ + Alias: (*Alias)(s), + } + + err := json.Unmarshal(b, &aux) + if err != nil { + return err + } + + if s.Transform, err = iceberg.ParseTransform(aux.TransformString); err != nil { + return err + } + + switch s.Direction { + case SortASC, SortDESC: + default: + return ErrInvalidSortDirection + } + + switch s.NullOrder { + case NullsFirst, NullsLast: + default: + return ErrInvalidNullOrder + } + + return nil +} + +const ( + InitialSortOrderID = 1 + UnsortedSortOrderID = 0 +) + +// A default Sort Order indicating no sort order at all +var UnsortedSortOrder = SortOrder{OrderID: UnsortedSortOrderID, Fields: []SortField{}} + +// SortOrder describes how the data is sorted within the table. +// +// Data can be sorted within partitions by columns to gain performance. The +// order of the sort fields within the list defines the order in which the +// sort is applied to the data. +type SortOrder struct { + OrderID int `json:"order-id"` + Fields []SortField `json:"fields"` +} + +func (s SortOrder) String() string { + var b strings.Builder + fmt.Fprintf(&b, "%d: ", s.OrderID) + b.WriteByte('[') + for i, f := range s.Fields { + if i == 0 { + b.WriteByte('\n') + } + b.WriteString(f.String()) + b.WriteByte('\n') + } + b.WriteByte(']') + return b.String() +} + +func (s *SortOrder) UnmarshalJSON(b []byte) error { + type Alias SortOrder + aux := (*Alias)(s) + + if err := json.Unmarshal(b, aux); err != nil { + return err + } + + if len(s.Fields) == 0 { + s.Fields = []SortField{} + s.OrderID = 0 + return nil + } + + if s.OrderID == 0 { + s.OrderID = InitialSortOrderID // initialize default sort order id + } + + return nil +} diff --git a/table/sorting_test.go b/table/sorting_test.go new file mode 100644 index 0000000..c12c8ff --- /dev/null +++ b/table/sorting_test.go @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table_test + +import ( + "encoding/json" + "testing" + + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/table" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var sortOrder = table.SortOrder{ + OrderID: 22, + Fields: []table.SortField{ + {SourceID: 19, Transform: iceberg.IdentityTransform{}, NullOrder: table.NullsFirst}, + {SourceID: 25, Transform: iceberg.BucketTransform{NumBuckets: 4}, Direction: table.SortDESC}, + {SourceID: 22, Transform: iceberg.VoidTransform{}, Direction: table.SortASC}, + }, +} + +func TestSerializeUnsortedSortOrder(t *testing.T) { + data, err := json.Marshal(table.UnsortedSortOrder) + require.NoError(t, err) + assert.JSONEq(t, `{"order-id": 0, "fields": []}`, string(data)) +} + +func TestSerializeSortOrder(t *testing.T) { + data, err := json.Marshal(sortOrder) + require.NoError(t, err) + assert.JSONEq(t, `{ + "order-id": 22, + "fields": [ + {"source-id": 19, "transform": "identity", "direction": "asc", "null-order": "nulls-first"}, + {"source-id": 25, "transform": "bucket[4]", "direction": "desc", "null-order": "nulls-last"}, + {"source-id": 22, "transform": "void", "direction": "asc", "null-order": "nulls-first"} + ] + }`, string(data)) +} + +func TestUnmarshalSortOrderDefaults(t *testing.T) { + var order table.SortOrder + require.NoError(t, json.Unmarshal([]byte(`{"fields": []}`), &order)) + assert.Equal(t, table.UnsortedSortOrder, order) + + require.NoError(t, json.Unmarshal([]byte(`{"fields": [{"source-id": 19, "transform": "identity", "direction": "asc", "null-order": "nulls-first"}]}`), &order)) + assert.Equal(t, table.InitialSortOrderID, order.OrderID) +} + +func TestUnmarshalInvalidSortDirection(t *testing.T) { + badJson := `{ + "order-id": 22, + "fields": [ + {"source-id": 19, "transform": "identity", "direction": "foobar", "null-order": "nulls-first"}, + {"source-id": 25, "transform": "bucket[4]", "direction": "desc", "null-order": "nulls-last"}, + {"source-id": 22, "transform": "void", "direction": "asc", "null-order": "nulls-first"} + ] + }` + + var order table.SortOrder + err := json.Unmarshal([]byte(badJson), &order) + assert.ErrorIs(t, err, table.ErrInvalidSortDirection) +} + +func TestUnmarshalInvalidSortNullOrder(t *testing.T) { + badJson := `{ + "order-id": 22, + "fields": [ + {"source-id": 19, "transform": "identity", "direction": "asc", "null-order": "foobar"}, + {"source-id": 25, "transform": "bucket[4]", "direction": "desc", "null-order": "nulls-last"}, + {"source-id": 22, "transform": "void", "direction": "asc", "null-order": "nulls-first"} + ] + }` + + var order table.SortOrder + err := json.Unmarshal([]byte(badJson), &order) + assert.ErrorIs(t, err, table.ErrInvalidNullOrder) +} + +func TestUnmarshalInvalidSortTransform(t *testing.T) { + badJson := `{ + "order-id": 22, + "fields": [ + {"source-id": 19, "transform": "foobar", "direction": "asc", "null-order": "nulls-first"}, + {"source-id": 25, "transform": "bucket[4]", "direction": "desc", "null-order": "nulls-last"}, + {"source-id": 22, "transform": "void", "direction": "asc", "null-order": "nulls-first"} + ] + }` + + var order table.SortOrder + err := json.Unmarshal([]byte(badJson), &order) + assert.ErrorIs(t, err, iceberg.ErrInvalidTransform) +} diff --git a/table/table.go b/table/table.go new file mode 100644 index 0000000..80b68b3 --- /dev/null +++ b/table/table.go @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "reflect" + + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/io" + "golang.org/x/exp/slices" +) + +type Identifier = []string + +type Table struct { + identifier Identifier + metadata Metadata + metadataLocation string + fs io.IO +} + +func (t Table) Equals(other Table) bool { + return slices.Equal(t.identifier, other.identifier) && + t.metadataLocation == other.metadataLocation && + reflect.DeepEqual(t.metadata, other.metadata) +} + +func (t Table) Identifier() Identifier { return t.identifier } +func (t Table) Metadata() Metadata { return t.metadata } +func (t Table) MetadataLocation() string { return t.metadataLocation } +func (t Table) FS() io.IO { return t.fs } + +func (t Table) Schema() *iceberg.Schema { return t.metadata.CurrentSchema() } +func (t Table) Spec() iceberg.PartitionSpec { return t.metadata.PartitionSpec() } +func (t Table) SortOrder() SortOrder { return t.metadata.SortOrder() } +func (t Table) Properties() iceberg.Properties { return t.metadata.Properties() } +func (t Table) Location() string { return t.metadata.Location() } +func (t Table) CurrentSnapshot() *Snapshot { return t.metadata.CurrentSnapshot() } +func (t Table) SnapshotByID(id int64) *Snapshot { return t.metadata.SnapshotByID(id) } +func (t Table) SnapshotByName(name string) *Snapshot { return t.metadata.SnapshotByName(name) } +func (t Table) Schemas() map[int]*iceberg.Schema { + m := make(map[int]*iceberg.Schema) + for _, s := range t.metadata.Schemas() { + m[s.ID] = s + } + return m +} + +func New(ident Identifier, meta Metadata, location string, fs io.IO) *Table { + return &Table{ + identifier: ident, + metadata: meta, + metadataLocation: location, + fs: fs, + } +} + +func NewFromLocation(ident Identifier, metalocation string, fsys io.IO) (*Table, error) { + var meta Metadata + + if rf, ok := fsys.(io.ReadFileIO); ok { + data, err := rf.ReadFile(metalocation) + if err != nil { + return nil, err + } + + if meta, err = ParseMetadataBytes(data); err != nil { + return nil, err + } + } else { + f, err := fsys.Open(metalocation) + if err != nil { + return nil, err + } + defer f.Close() + + if meta, err = ParseMetadata(f); err != nil { + return nil, err + } + } + return New(ident, meta, metalocation, fsys), nil +} diff --git a/table/table_test.go b/table/table_test.go new file mode 100644 index 0000000..cde94ab --- /dev/null +++ b/table/table_test.go @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table_test + +import ( + "bytes" + "testing" + + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/internal" + "github.com/apache/iceberg-go/table" + "github.com/stretchr/testify/suite" +) + +type TableTestSuite struct { + suite.Suite + + tbl *table.Table +} + +func TestTable(t *testing.T) { + suite.Run(t, new(TableTestSuite)) +} + +func (t *TableTestSuite) SetupSuite() { + var mockfs internal.MockFS + mockfs.Test(t.T()) + mockfs.On("Open", "s3://bucket/test/location/uuid.metadata.json"). + Return(&internal.MockFile{Contents: bytes.NewReader([]byte(ExampleTableMetadataV2))}, nil) + defer mockfs.AssertExpectations(t.T()) + + tbl, err := table.NewFromLocation([]string{"foo"}, "s3://bucket/test/location/uuid.metadata.json", &mockfs) + t.Require().NoError(err) + t.Require().NotNil(tbl) + + t.Equal([]string{"foo"}, tbl.Identifier()) + t.Equal("s3://bucket/test/location/uuid.metadata.json", tbl.MetadataLocation()) + t.Equal(&mockfs, tbl.FS()) + + t.tbl = tbl +} + +func (t *TableTestSuite) TestNewTableFromReadFile() { + var mockfsReadFile internal.MockFSReadFile + mockfsReadFile.Test(t.T()) + mockfsReadFile.On("ReadFile", "s3://bucket/test/location/uuid.metadata.json"). + Return([]byte(ExampleTableMetadataV2), nil) + defer mockfsReadFile.AssertExpectations(t.T()) + + tbl2, err := table.NewFromLocation([]string{"foo"}, "s3://bucket/test/location/uuid.metadata.json", &mockfsReadFile) + t.Require().NoError(err) + t.Require().NotNil(tbl2) + + t.True(t.tbl.Equals(*tbl2)) +} + +func (t *TableTestSuite) TestSchema() { + t.True(t.tbl.Schema().Equals(iceberg.NewSchemaWithIdentifiers(1, []int{1, 2}, + iceberg.NestedField{ID: 1, Name: "x", Type: iceberg.PrimitiveTypes.Int64, Required: true}, + iceberg.NestedField{ID: 2, Name: "y", Type: iceberg.PrimitiveTypes.Int64, Required: true, Doc: "comment"}, + iceberg.NestedField{ID: 3, Name: "z", Type: iceberg.PrimitiveTypes.Int64, Required: true}, + ))) +} + +func (t *TableTestSuite) TestPartitionSpec() { + t.Equal(iceberg.NewPartitionSpec( + iceberg.PartitionField{SourceID: 1, FieldID: 1000, Transform: iceberg.IdentityTransform{}, Name: "x"}, + ), t.tbl.Spec()) +} + +func (t *TableTestSuite) TestSortOrder() { + t.Equal(table.SortOrder{ + OrderID: 3, + Fields: []table.SortField{ + {SourceID: 2, Transform: iceberg.IdentityTransform{}, Direction: table.SortASC, NullOrder: table.NullsFirst}, + {SourceID: 3, Transform: iceberg.BucketTransform{NumBuckets: 4}, Direction: table.SortDESC, NullOrder: table.NullsLast}, + }, + }, t.tbl.SortOrder()) +} + +func (t *TableTestSuite) TestLocation() { + t.Equal("s3://bucket/test/location", t.tbl.Location()) +} + +func (t *TableTestSuite) TestSnapshot() { + var ( + parentSnapshotID int64 = 3051729675574597004 + one = 1 + manifestList = "s3://a/b/2.avro" + ) + + testSnapshot := table.Snapshot{ + SnapshotID: 3055729675574597004, + ParentSnapshotID: &parentSnapshotID, + SequenceNumber: 1, + TimestampMs: 1555100955770, + ManifestList: manifestList, + Summary: &table.Summary{Operation: table.OpAppend, Properties: map[string]string{}}, + SchemaID: &one, + } + t.True(testSnapshot.Equals(*t.tbl.CurrentSnapshot())) + + t.True(testSnapshot.Equals(*t.tbl.SnapshotByID(3055729675574597004))) +} + +func (t *TableTestSuite) TestSnapshotByName() { + testSnapshot := table.Snapshot{ + SnapshotID: 3051729675574597004, + TimestampMs: 1515100955770, + ManifestList: "s3://a/b/1.avro", + Summary: &table.Summary{Operation: table.OpAppend}, + } + + t.True(testSnapshot.Equals(*t.tbl.SnapshotByName("test"))) +}