From 27b2ee360fd8d2cb6011a3078dd8cfcf3a3e8595 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Tue, 26 Sep 2023 16:09:56 -0400 Subject: [PATCH 1/6] feat(tables): add basic table implementation --- table/metadata.go | 351 ++++++++++++++++++++++++++++ table/metadata_test.go | 491 ++++++++++++++++++++++++++++++++++++++++ table/refs.go | 61 +++++ table/refs_test.go | 37 +++ table/snapshots.go | 181 +++++++++++++++ table/snapshots_test.go | 105 +++++++++ table/sorting.go | 158 +++++++++++++ table/sorting_test.go | 110 +++++++++ table/table.go | 97 ++++++++ table/table_test.go | 130 +++++++++++ 10 files changed, 1721 insertions(+) create mode 100644 table/metadata.go create mode 100644 table/metadata_test.go create mode 100644 table/refs.go create mode 100644 table/refs_test.go create mode 100644 table/snapshots.go create mode 100644 table/snapshots_test.go create mode 100644 table/sorting.go create mode 100644 table/sorting_test.go create mode 100644 table/table.go create mode 100644 table/table_test.go diff --git a/table/metadata.go b/table/metadata.go new file mode 100644 index 0000000..30fea19 --- /dev/null +++ b/table/metadata.go @@ -0,0 +1,351 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "encoding/json" + "errors" + "fmt" + "io" + + "github.com/apache/iceberg-go" + + "github.com/google/uuid" +) + +type Metadata interface { + Version() int + TableUUID() uuid.UUID + Loc() string + LastUpdated() int + LastColumn() int + Schemas() []*iceberg.Schema + CurrentSchema() *iceberg.Schema + PartitionSpecs() []iceberg.PartitionSpec + PartitionSpec() iceberg.PartitionSpec + DefaultPartitionSpec() int + LastPartitionSpecID() *int + Snapshots() []Snapshot + SnapshotByID(int64) *Snapshot + SnapshotByName(name string) *Snapshot + CurrentSnapshot() *Snapshot + SortOrder() SortOrder + SortOrders() []SortOrder + Properties() iceberg.Properties +} + +var ( + ErrInvalidMetadataFormatVersion = errors.New("invalid or missing format-version in table metadata") + ErrInvalidMetadata = errors.New("invalid metadata") +) + +func ParseMetadata(r io.Reader) (Metadata, error) { + data, err := io.ReadAll(r) + if err != nil { + return nil, err + } + + return ParseMetadataBytes(data) +} + +func ParseMetadataString(s string) (Metadata, error) { + return ParseMetadataBytes([]byte(s)) +} + +func ParseMetadataBytes(b []byte) (Metadata, error) { + ver := struct { + FormatVersion int `json:"format-version"` + }{} + if err := json.Unmarshal(b, &ver); err != nil { + return nil, err + } + + var ret Metadata + switch ver.FormatVersion { + case 1: + ret = &MetadataV1{} + case 2: + ret = &MetadataV2{} + default: + return nil, ErrInvalidMetadataFormatVersion + } + + return ret, json.Unmarshal(b, ret) +} + +// https://iceberg.apache.org/spec/#iceberg-table-spec +type commonMetadata struct { + FormatVersion int `json:"format-version"` + UUID uuid.UUID `json:"table-uuid"` + Location string `json:"location"` + LastUpdatedMS int `json:"last-updated-ms"` + LastColumnID int `json:"last-column-id"` + SchemaList []*iceberg.Schema `json:"schemas"` + CurrentSchemaID int `json:"current-schema-id"` + Specs []iceberg.PartitionSpec `json:"partition-specs"` + DefaultSpecID int `json:"default-spec-id"` + LastPartitionID *int `json:"last-partition-id,omitempty"` + Props iceberg.Properties `json:"properties"` + SnapshotList []Snapshot `json:"snapshots,omitempty"` + CurrentSnapshotID *int64 `json:"current-snapshot-id,omitempty"` + SnapshotLog []SnapshotLogEntry `json:"snapshot-log"` + MetadataLog []MetadataLogEntry `json:"metadata-log"` + SortOrderList []SortOrder `json:"sort-orders"` + DefaultSortOrderID int `json:"default-sort-order-id"` + Refs map[string]SnapshotRef `json:"refs"` +} + +func (c *commonMetadata) TableUUID() uuid.UUID { return c.UUID } +func (c *commonMetadata) Loc() string { return c.Location } +func (c *commonMetadata) LastUpdated() int { return c.LastUpdatedMS } +func (c *commonMetadata) LastColumn() int { return c.LastColumnID } +func (c *commonMetadata) Schemas() []*iceberg.Schema { return c.SchemaList } +func (c *commonMetadata) CurrentSchema() *iceberg.Schema { + for _, s := range c.SchemaList { + if s.ID == c.CurrentSchemaID { + return s + } + } + panic("should never get here") +} + +func (c *commonMetadata) PartitionSpecs() []iceberg.PartitionSpec { + return c.Specs +} + +func (c *commonMetadata) DefaultPartitionSpec() int { + return c.DefaultSpecID +} + +func (c *commonMetadata) PartitionSpec() iceberg.PartitionSpec { + for _, s := range c.Specs { + if s.ID() == c.DefaultSpecID { + return s + } + } + return *iceberg.UnpartitionedSpec +} + +func (c *commonMetadata) LastPartitionSpecID() *int { return c.LastPartitionID } +func (c *commonMetadata) Snapshots() []Snapshot { return c.SnapshotList } +func (c *commonMetadata) SnapshotByID(id int64) *Snapshot { + for i := range c.SnapshotList { + if c.SnapshotList[i].SnapshotID == id { + return &c.SnapshotList[i] + } + } + return nil +} + +func (c *commonMetadata) SnapshotByName(name string) *Snapshot { + if ref, ok := c.Refs[name]; ok { + return c.SnapshotByID(ref.SnapshotID) + } + return nil +} + +func (c *commonMetadata) CurrentSnapshot() *Snapshot { + if c.CurrentSnapshotID == nil { + return nil + } + return c.SnapshotByID(*c.CurrentSnapshotID) +} + +func (c *commonMetadata) SortOrders() []SortOrder { return c.SortOrderList } +func (c *commonMetadata) SortOrder() SortOrder { + for _, s := range c.SortOrderList { + if s.OrderID == c.DefaultSortOrderID { + return s + } + } + return UnsortedSortOrder +} + +func (c *commonMetadata) Properties() iceberg.Properties { + return c.Props +} + +func (c *commonMetadata) preValidate() { + if c.CurrentSnapshotID != nil && *c.CurrentSnapshotID == -1 { + // treat -1 as the same as nil, clean this up in pre-validation + // to make the validation logic simplified later + c.CurrentSnapshotID = nil + } + + if c.CurrentSnapshotID != nil { + if _, ok := c.Refs[MainBranch]; !ok { + c.Refs[MainBranch] = SnapshotRef{ + SnapshotID: *c.CurrentSnapshotID, + SnapshotRefType: BranchRef, + } + } + } + + if c.MetadataLog == nil { + c.MetadataLog = []MetadataLogEntry{} + } + + if c.Refs == nil { + c.Refs = make(map[string]SnapshotRef) + } + + if c.SnapshotLog == nil { + c.SnapshotLog = []SnapshotLogEntry{} + } +} + +func (c *commonMetadata) checkSchemas() error { + // check that current-schema-id is present in schemas + for _, s := range c.SchemaList { + if s.ID == c.CurrentSchemaID { + return nil + } + } + + return fmt.Errorf("%w: current-schema-id %d can't be found in any schema", + ErrInvalidMetadata, c.CurrentSchemaID) +} + +func (c *commonMetadata) checkPartitionSpecs() error { + for _, spec := range c.Specs { + if spec.ID() == c.DefaultSpecID { + return nil + } + } + + return fmt.Errorf("%w: default-spec-id %d can't be found", + ErrInvalidMetadata, c.DefaultSpecID) +} + +func (c *commonMetadata) checkSortOrders() error { + if c.DefaultSortOrderID == UnsortedSortOrderID { + return nil + } + + for _, o := range c.SortOrderList { + if o.OrderID == c.DefaultSortOrderID { + return nil + } + } + + return fmt.Errorf("%w: default-sort-order-id %d can't be found in %+v", + ErrInvalidMetadata, c.DefaultSortOrderID, c.SortOrderList) +} + +func (c *commonMetadata) validate() error { + if err := c.checkSchemas(); err != nil { + return err + } + + if err := c.checkPartitionSpecs(); err != nil { + return err + } + + if err := c.checkSortOrders(); err != nil { + return err + } + + switch { + case c.LastUpdatedMS == 0: + // last-updated-ms is required + return fmt.Errorf("%w: missing last-updated-ms", ErrInvalidMetadata) + case c.LastColumnID == 0: + // last-column-id is required + return fmt.Errorf("%w: missing last-column-id", ErrInvalidMetadata) + } + + return nil +} + +func (c *commonMetadata) Version() int { return c.FormatVersion } + +type MetadataV1 struct { + Schema iceberg.Schema `json:"schema"` + Partition []iceberg.PartitionField `json:"partition-spec"` + + commonMetadata +} + +func (m *MetadataV1) preValidate() { + if len(m.SchemaList) == 0 { + m.SchemaList = []*iceberg.Schema{&m.Schema} + } + + if len(m.Specs) == 0 { + m.Specs = []iceberg.PartitionSpec{ + iceberg.NewPartitionSpec(m.Partition...)} + m.DefaultSpecID = m.Specs[0].ID() + } + + if m.LastPartitionID == nil { + id := m.Specs[0].LastAssignedFieldID() + for _, spec := range m.Specs[1:] { + last := spec.LastAssignedFieldID() + if last > id { + id = last + } + } + m.LastPartitionID = &id + } + + if len(m.SortOrderList) == 0 { + m.SortOrderList = []SortOrder{UnsortedSortOrder} + } + + m.commonMetadata.preValidate() +} + +func (m *MetadataV1) UnmarshalJSON(b []byte) error { + type Alias MetadataV1 + aux := (*Alias)(m) + + if err := json.Unmarshal(b, aux); err != nil { + return err + } + + m.preValidate() + return m.validate() +} + +func (m *MetadataV1) ToV2() MetadataV2 { + commonOut := m.commonMetadata + commonOut.FormatVersion = 2 + if commonOut.UUID.String() == "" { + commonOut.UUID = uuid.New() + } + + return MetadataV2{commonMetadata: commonOut} +} + +type MetadataV2 struct { + LastSequenceNumber int `json:"last-sequence-number"` + + commonMetadata +} + +func (m *MetadataV2) UnmarshalJSON(b []byte) error { + type Alias MetadataV2 + aux := (*Alias)(m) + + if err := json.Unmarshal(b, aux); err != nil { + return err + } + + m.preValidate() + return m.validate() +} diff --git a/table/metadata_test.go b/table/metadata_test.go new file mode 100644 index 0000000..46f10f9 --- /dev/null +++ b/table/metadata_test.go @@ -0,0 +1,491 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table_test + +import ( + "encoding/json" + "testing" + + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/table" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ExampleTableMetadataV2 = `{ + "format-version": 2, + "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", + "location": "s3://bucket/test/location", + "last-sequence-number": 34, + "last-updated-ms": 1602638573590, + "last-column-id": 3, + "current-schema-id": 1, + "schemas": [ + {"type": "struct", "schema-id": 0, "fields": [{"id": 1, "name": "x", "required": true, "type": "long"}]}, + { + "type": "struct", + "schema-id": 1, + "identifier-field-ids": [1, 2], + "fields": [ + {"id": 1, "name": "x", "required": true, "type": "long"}, + {"id": 2, "name": "y", "required": true, "type": "long", "doc": "comment"}, + {"id": 3, "name": "z", "required": true, "type": "long"} + ] + } + ], + "default-spec-id": 0, + "partition-specs": [{"spec-id": 0, "fields": [{"name": "x", "transform": "identity", "source-id": 1, "field-id": 1000}]}], + "last-partition-id": 1000, + "default-sort-order-id": 3, + "sort-orders": [ + { + "order-id": 3, + "fields": [ + {"transform": "identity", "source-id": 2, "direction": "asc", "null-order": "nulls-first"}, + {"transform": "bucket[4]", "source-id": 3, "direction": "desc", "null-order": "nulls-last"} + ] + } + ], + "properties": {"read.split.target.size": "134217728"}, + "current-snapshot-id": 3055729675574597004, + "snapshots": [ + { + "snapshot-id": 3051729675574597004, + "timestamp-ms": 1515100955770, + "sequence-number": 0, + "summary": {"operation": "append"}, + "manifest-list": "s3://a/b/1.avro" + }, + { + "snapshot-id": 3055729675574597004, + "parent-snapshot-id": 3051729675574597004, + "timestamp-ms": 1555100955770, + "sequence-number": 1, + "summary": {"operation": "append"}, + "manifest-list": "s3://a/b/2.avro", + "schema-id": 1 + } + ], + "snapshot-log": [ + {"snapshot-id": 3051729675574597004, "timestamp-ms": 1515100955770}, + {"snapshot-id": 3055729675574597004, "timestamp-ms": 1555100955770} + ], + "metadata-log": [{"metadata-file": "s3://bucket/.../v1.json", "timestamp-ms": 1515100}], + "refs": {"test": {"snapshot-id": 3051729675574597004, "type": "tag", "max-ref-age-ms": 10000000}} +}` + +const ExampleTableMetadataV1 = `{ + "format-version": 1, + "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c", + "location": "s3://bucket/test/location", + "last-updated-ms": 1602638573874, + "last-column-id": 3, + "schema": { + "type": "struct", + "fields": [ + {"id": 1, "name": "x", "required": true, "type": "long"}, + {"id": 2, "name": "y", "required": true, "type": "long", "doc": "comment"}, + {"id": 3, "name": "z", "required": true, "type": "long"} + ] + }, + "partition-spec": [{"name": "x", "transform": "identity", "source-id": 1, "field-id": 1000}], + "properties": {}, + "current-snapshot-id": -1, + "snapshots": [{"snapshot-id": 1925, "timestamp-ms": 1602638573822}] +}` + +func TestMetadataV1Parsing(t *testing.T) { + meta, err := table.ParseMetadataBytes([]byte(ExampleTableMetadataV1)) + require.NoError(t, err) + require.NotNil(t, meta) + + assert.IsType(t, (*table.MetadataV1)(nil), meta) + assert.Equal(t, 1, meta.Version()) + + data := meta.(*table.MetadataV1) + assert.Equal(t, uuid.MustParse("d20125c8-7284-442c-9aea-15fee620737c"), meta.TableUUID()) + assert.Equal(t, "s3://bucket/test/location", meta.Loc()) + assert.Equal(t, 1602638573874, meta.LastUpdated()) + assert.Equal(t, 3, meta.LastColumn()) + + expected := iceberg.NewSchema( + 0, + iceberg.NestedField{ID: 1, Name: "x", Type: iceberg.PrimitiveTypes.Int64, Required: true}, + iceberg.NestedField{ID: 2, Name: "y", Type: iceberg.PrimitiveTypes.Int64, Required: true, Doc: "comment"}, + iceberg.NestedField{ID: 3, Name: "z", Type: iceberg.PrimitiveTypes.Int64, Required: true}, + ) + + assert.Equal(t, []*iceberg.Schema{expected}, meta.Schemas()) + assert.Zero(t, data.SchemaList[0].ID) + assert.True(t, meta.CurrentSchema().Equals(expected)) + assert.Equal(t, []iceberg.PartitionSpec{ + iceberg.NewPartitionSpec(iceberg.PartitionField{ + SourceID: 1, FieldID: 1000, Transform: iceberg.IdentityTransform{}, Name: "x", + }), + }, meta.PartitionSpecs()) + + assert.Equal(t, iceberg.NewPartitionSpec(iceberg.PartitionField{ + SourceID: 1, FieldID: 1000, Transform: iceberg.IdentityTransform{}, Name: "x", + }), meta.PartitionSpec()) + + assert.Equal(t, 0, meta.DefaultPartitionSpec()) + assert.Equal(t, 1000, *meta.LastPartitionSpecID()) + assert.Nil(t, data.CurrentSnapshotID) + assert.Nil(t, meta.CurrentSnapshot()) + assert.Len(t, meta.Snapshots(), 1) + assert.NotNil(t, meta.SnapshotByID(1925)) + assert.Nil(t, meta.SnapshotByID(0)) + assert.Nil(t, meta.SnapshotByName("foo")) + assert.Zero(t, data.DefaultSortOrderID) + assert.Equal(t, table.UnsortedSortOrder, meta.SortOrder()) +} + +func TestMetadataV2Parsing(t *testing.T) { + meta, err := table.ParseMetadataBytes([]byte(ExampleTableMetadataV2)) + require.NoError(t, err) + require.NotNil(t, meta) + + assert.IsType(t, (*table.MetadataV2)(nil), meta) + assert.Equal(t, 2, meta.Version()) + + data := meta.(*table.MetadataV2) + assert.Equal(t, uuid.MustParse("9c12d441-03fe-4693-9a96-a0705ddf69c1"), data.UUID) + assert.Equal(t, "s3://bucket/test/location", data.Location) + assert.Equal(t, 34, data.LastSequenceNumber) + assert.Equal(t, 1602638573590, data.LastUpdatedMS) + assert.Equal(t, 3, data.LastColumnID) + assert.Equal(t, 0, data.SchemaList[0].ID) + assert.Equal(t, 1, data.CurrentSchemaID) + assert.Equal(t, 0, data.Specs[0].ID()) + assert.Equal(t, 0, data.DefaultSpecID) + assert.Equal(t, 1000, *data.LastPartitionID) + assert.EqualValues(t, "134217728", data.Props["read.split.target.size"]) + assert.EqualValues(t, 3055729675574597004, *data.CurrentSnapshotID) + assert.EqualValues(t, 3051729675574597004, data.SnapshotList[0].SnapshotID) + assert.Equal(t, 1515100955770, data.SnapshotLog[0].TimestampMs) + assert.Equal(t, 3, data.SortOrderList[0].OrderID) + assert.Equal(t, 3, data.DefaultSortOrderID) + + assert.Len(t, meta.Snapshots(), 2) + assert.Equal(t, data.SnapshotList[1], *meta.CurrentSnapshot()) + assert.Equal(t, data.SnapshotList[0], *meta.SnapshotByName("test")) + assert.EqualValues(t, "134217728", meta.Properties()["read.split.target.size"]) +} + +func TestParsingCorrectTypes(t *testing.T) { + var meta table.MetadataV2 + require.NoError(t, json.Unmarshal([]byte(ExampleTableMetadataV2), &meta)) + + assert.IsType(t, &iceberg.Schema{}, meta.SchemaList[0]) + assert.IsType(t, iceberg.NestedField{}, meta.SchemaList[0].Field(0)) + assert.IsType(t, iceberg.PrimitiveTypes.Int64, meta.SchemaList[0].Field(0).Type) +} + +func TestSerializeMetadataV1(t *testing.T) { + var meta table.MetadataV1 + require.NoError(t, json.Unmarshal([]byte(ExampleTableMetadataV1), &meta)) + + data, err := json.Marshal(&meta) + require.NoError(t, err) + + assert.JSONEq(t, `{"location": "s3://bucket/test/location", "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c", "last-updated-ms": 1602638573874, "last-column-id": 3, "schemas": [{"type": "struct", "fields": [{"id": 1, "name": "x", "type": "long", "required": true}, {"id": 2, "name": "y", "type": "long", "required": true, "doc": "comment"}, {"id": 3, "name": "z", "type": "long", "required": true}], "schema-id": 0, "identifier-field-ids": []}], "current-schema-id": 0, "partition-specs": [{"spec-id": 0, "fields": [{"source-id": 1, "field-id": 1000, "transform": "identity", "name": "x"}]}], "default-spec-id": 0, "last-partition-id": 1000, "properties": {}, "snapshots": [{"snapshot-id": 1925, "sequence-number": 0, "timestamp-ms": 1602638573822}], "snapshot-log": [], "metadata-log": [], "sort-orders": [{"order-id": 0, "fields": []}], "default-sort-order-id": 0, "refs": {}, "format-version": 1, "schema": {"type": "struct", "fields": [{"id": 1, "name": "x", "type": "long", "required": true}, {"id": 2, "name": "y", "type": "long", "required": true, "doc": "comment"}, {"id": 3, "name": "z", "type": "long", "required": true}], "schema-id": 0, "identifier-field-ids": []}, "partition-spec": [{"name": "x", "transform": "identity", "source-id": 1, "field-id": 1000}]}`, + string(data)) +} + +func TestSerializeMetadataV2(t *testing.T) { + var meta table.MetadataV2 + require.NoError(t, json.Unmarshal([]byte(ExampleTableMetadataV2), &meta)) + + data, err := json.Marshal(&meta) + require.NoError(t, err) + + assert.JSONEq(t, `{"location": "s3://bucket/test/location", "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", "last-updated-ms": 1602638573590, "last-column-id": 3, "schemas": [{"type": "struct", "fields": [{"id": 1, "name": "x", "type": "long", "required": true}], "schema-id": 0, "identifier-field-ids": []}, {"type": "struct", "fields": [{"id": 1, "name": "x", "type": "long", "required": true}, {"id": 2, "name": "y", "type": "long", "required": true, "doc": "comment"}, {"id": 3, "name": "z", "type": "long", "required": true}], "schema-id": 1, "identifier-field-ids": [1, 2]}], "current-schema-id": 1, "partition-specs": [{"spec-id": 0, "fields": [{"source-id": 1, "field-id": 1000, "transform": "identity", "name": "x"}]}], "default-spec-id": 0, "last-partition-id": 1000, "properties": {"read.split.target.size": "134217728"}, "current-snapshot-id": 3055729675574597004, "snapshots": [{"snapshot-id": 3051729675574597004, "sequence-number": 0, "timestamp-ms": 1515100955770, "manifest-list": "s3://a/b/1.avro", "summary": {"operation": "append"}}, {"snapshot-id": 3055729675574597004, "parent-snapshot-id": 3051729675574597004, "sequence-number": 1, "timestamp-ms": 1555100955770, "manifest-list": "s3://a/b/2.avro", "summary": {"operation": "append"}, "schema-id": 1}], "snapshot-log": [{"snapshot-id": 3051729675574597004, "timestamp-ms": 1515100955770}, {"snapshot-id": 3055729675574597004, "timestamp-ms": 1555100955770}], "metadata-log": [{"metadata-file": "s3://bucket/.../v1.json", "timestamp-ms": 1515100}], "sort-orders": [{"order-id": 3, "fields": [{"source-id": 2, "transform": "identity", "direction": "asc", "null-order": "nulls-first"}, {"source-id": 3, "transform": "bucket[4]", "direction": "desc", "null-order": "nulls-last"}]}], "default-sort-order-id": 3, "refs": {"test": {"snapshot-id": 3051729675574597004, "type": "tag", "max-ref-age-ms": 10000000}, "main": {"snapshot-id": 3055729675574597004, "type": "branch"}}, "format-version": 2, "last-sequence-number": 34}`, + string(data)) +} + +func TestInvalidFormatVersion(t *testing.T) { + metadataInvalidFormat := `{ + "format-version": -1, + "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c", + "location": "s3://bucket/test/location", + "last-updated-ms": 1602638573874, + "last-column-id": 3, + "schema": { + "type": "struct", + "fields": [ + {"id": 1, "name": "x", "required": true, "type": "long"}, + {"id": 2, "name": "y", "required": true, "type": "long", "doc": "comment"}, + {"id": 3, "name": "z", "required": true, "type": "long"} + ] + }, + "partition-spec": [{"name": "x", "transform": "identity", "source-id": 1, "field-id": 1000}], + "properties": {}, + "current-snapshot-id": -1, + "snapshots": [] + }` + + _, err := table.ParseMetadataBytes([]byte(metadataInvalidFormat)) + assert.Error(t, err) + assert.ErrorIs(t, err, table.ErrInvalidMetadataFormatVersion) +} + +func TestCurrentSchemaNotFound(t *testing.T) { + schemaNotFound := `{ + "format-version": 2, + "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c", + "location": "s3://bucket/test/location", + "last-updated-ms": 1602638573874, + "last-column-id": 3, + "schemas": [ + {"type": "struct", "schema-id": 0, "fields": [{"id": 1, "name": "x", "required": true, "type": "long"}]}, + { + "type": "struct", + "schema-id": 1, + "identifier-field-ids": [1, 2], + "fields": [ + {"id": 1, "name": "x", "required": true, "type": "long"}, + {"id": 2, "name": "y", "required": true, "type": "long", "doc": "comment"}, + {"id": 3, "name": "z", "required": true, "type": "long"} + ] + } + ], + "current-schema-id": 2, + "default-spec-id": 0, + "partition-specs": [{"spec-id": 0, "fields": [{"name": "x", "transform": "identity", "source-id": 1, "field-id": 1000}]}], + "last-partition-id": 1000, + "default-sort-order-id": 0, + "properties": {}, + "current-snapshot-id": -1, + "snapshots": [] + }` + + _, err := table.ParseMetadataBytes([]byte(schemaNotFound)) + assert.Error(t, err) + assert.ErrorIs(t, err, table.ErrInvalidMetadata) + assert.ErrorContains(t, err, "current-schema-id 2 can't be found in any schema") +} + +func TestSortOrderNotFound(t *testing.T) { + metadataSortOrderNotFound := `{ + "format-version": 2, + "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c", + "location": "s3://bucket/test/location", + "last-updated-ms": 1602638573874, + "last-column-id": 3, + "schemas": [ + { + "type": "struct", + "schema-id": 0, + "identifier-field-ids": [1, 2], + "fields": [ + {"id": 1, "name": "x", "required": true, "type": "long"}, + {"id": 2, "name": "y", "required": true, "type": "long", "doc": "comment"}, + {"id": 3, "name": "z", "required": true, "type": "long"} + ] + } + ], + "default-sort-order-id": 4, + "sort-orders": [ + { + "order-id": 3, + "fields": [ + {"transform": "identity", "source-id": 2, "direction": "asc", "null-order": "nulls-first"}, + {"transform": "bucket[4]", "source-id": 3, "direction": "desc", "null-order": "nulls-last"} + ] + } + ], + "current-schema-id": 0, + "default-spec-id": 0, + "partition-specs": [{"spec-id": 0, "fields": [{"name": "x", "transform": "identity", "source-id": 1, "field-id": 1000}]}], + "last-partition-id": 1000, + "properties": {}, + "current-snapshot-id": -1, + "snapshots": [] + }` + + _, err := table.ParseMetadataBytes([]byte(metadataSortOrderNotFound)) + assert.Error(t, err) + assert.ErrorIs(t, err, table.ErrInvalidMetadata) + assert.ErrorContains(t, err, "default-sort-order-id 4 can't be found in [3: [\n2 asc nulls-first\nbucket[4](3) desc nulls-last\n]]") +} + +func TestSortOrderUnsorted(t *testing.T) { + sortOrderUnsorted := `{ + "format-version": 2, + "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c", + "location": "s3://bucket/test/location", + "last-updated-ms": 1602638573874, + "last-column-id": 3, + "schemas": [ + { + "type": "struct", + "schema-id": 0, + "identifier-field-ids": [1, 2], + "fields": [ + {"id": 1, "name": "x", "required": true, "type": "long"}, + {"id": 2, "name": "y", "required": true, "type": "long", "doc": "comment"}, + {"id": 3, "name": "z", "required": true, "type": "long"} + ] + } + ], + "default-sort-order-id": 0, + "sort-orders": [], + "current-schema-id": 0, + "default-spec-id": 0, + "partition-specs": [{"spec-id": 0, "fields": [{"name": "x", "transform": "identity", "source-id": 1, "field-id": 1000}]}], + "last-partition-id": 1000, + "properties": {}, + "current-snapshot-id": -1, + "snapshots": [] + }` + + var meta table.MetadataV2 + require.NoError(t, json.Unmarshal([]byte(sortOrderUnsorted), &meta)) + + assert.Equal(t, table.UnsortedSortOrderID, meta.DefaultSortOrderID) + assert.Len(t, meta.SortOrderList, 0) +} + +func TestInvalidPartitionSpecID(t *testing.T) { + invalidSpecID := `{ + "format-version": 2, + "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", + "location": "s3://bucket/test/location", + "last-sequence-number": 34, + "last-updated-ms": 1602638573590, + "last-column-id": 3, + "current-schema-id": 1, + "schemas": [ + {"type": "struct", "schema-id": 0, "fields": [{"id": 1, "name": "x", "required": true, "type": "long"}]}, + { + "type": "struct", + "schema-id": 1, + "identifier-field-ids": [1, 2], + "fields": [ + {"id": 1, "name": "x", "required": true, "type": "long"}, + {"id": 2, "name": "y", "required": true, "type": "long", "doc": "comment"}, + {"id": 3, "name": "z", "required": true, "type": "long"} + ] + } + ], + "sort-orders": [], + "default-sort-order-id": 0, + "default-spec-id": 1, + "partition-specs": [{"spec-id": 0, "fields": [{"name": "x", "transform": "identity", "source-id": 1, "field-id": 1000}]}], + "last-partition-id": 1000 + }` + + var meta table.MetadataV2 + err := json.Unmarshal([]byte(invalidSpecID), &meta) + assert.ErrorIs(t, err, table.ErrInvalidMetadata) + assert.ErrorContains(t, err, "default-spec-id 1 can't be found") +} + +func TestV2RefCreation(t *testing.T) { + var meta table.MetadataV2 + require.NoError(t, json.Unmarshal([]byte(ExampleTableMetadataV2), &meta)) + + maxRefAge := 10000000 + assert.Equal(t, map[string]table.SnapshotRef{ + "main": { + SnapshotID: 3055729675574597004, + SnapshotRefType: table.BranchRef, + }, + "test": { + SnapshotID: 3051729675574597004, + SnapshotRefType: table.TagRef, + MaxRefAgeMs: &maxRefAge, + }, + }, meta.Refs) +} + +func TestV1WriteMetadataToV2(t *testing.T) { + // https://iceberg.apache.org/spec/#version-2 + // + // Table metadata JSON: + // - last-sequence-number was added and is required; default to 0 when reading v1 metadata + // - table-uuid is now required + // - current-schema-id is now required + // - schemas is now required + // - partition-specs is now required + // - default-spec-id is now required + // - last-partition-id is now required + // - sort-orders is now required + // - default-sort-order-id is now required + // - schema is no longer required and should be omitted; use schemas and current-schema-id instead + // - partition-spec is no longer required and should be omitted; use partition-specs and default-spec-id instead + + minimalV1Example := `{ + "format-version": 1, + "location": "s3://bucket/test/location", + "last-updated-ms": 1062638573874, + "last-column-id": 3, + "schema": { + "type": "struct", + "fields": [ + {"id": 1, "name": "x", "required": true, "type": "long"}, + {"id": 2, "name": "y", "required": true, "type": "long", "doc": "comment"}, + {"id": 3, "name": "z", "required": true, "type": "long"} + ] + }, + "partition-spec": [{"name": "x", "transform": "identity", "source-id": 1, "field-id": 1000}], + "properties": {}, + "current-snapshot-id": -1, + "snapshots": [{"snapshot-id": 1925, "timestamp-ms": 1602638573822}] + }` + + meta, err := table.ParseMetadataString(minimalV1Example) + require.NoError(t, err) + assert.IsType(t, (*table.MetadataV1)(nil), meta) + + metaV2 := meta.(*table.MetadataV1).ToV2() + metaV2Json, err := json.Marshal(metaV2) + require.NoError(t, err) + + rawData := make(map[string]any) + require.NoError(t, json.Unmarshal(metaV2Json, &rawData)) + + assert.EqualValues(t, 0, rawData["last-sequence-number"]) + assert.NotEmpty(t, rawData["table-uuid"]) + assert.EqualValues(t, 0, rawData["current-schema-id"]) + assert.Equal(t, []any{map[string]any{ + "fields": []any{ + map[string]any{"id": float64(1), "name": "x", "required": true, "type": "long"}, + map[string]any{"id": float64(2), "name": "y", "required": true, "type": "long", "doc": "comment"}, + map[string]any{"id": float64(3), "name": "z", "required": true, "type": "long"}, + }, + "identifier-field-ids": []any{}, + "schema-id": float64(0), + "type": "struct", + }}, rawData["schemas"]) + assert.Equal(t, []any{map[string]any{ + "spec-id": float64(0), + "fields": []any{map[string]any{ + "name": "x", "transform": "identity", + "source-id": float64(1), "field-id": float64(1000), + }}, + }}, rawData["partition-specs"]) + + assert.Zero(t, rawData["default-spec-id"]) + assert.EqualValues(t, 1000, rawData["last-partition-id"]) + assert.Zero(t, rawData["default-sort-order-id"]) + assert.Equal(t, []any{map[string]any{"order-id": float64(0), "fields": []any{}}}, rawData["sort-orders"]) + assert.NotContains(t, rawData, "schema") + assert.NotContains(t, rawData, "partition-spec") +} diff --git a/table/refs.go b/table/refs.go new file mode 100644 index 0000000..e228c7e --- /dev/null +++ b/table/refs.go @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "encoding/json" + "errors" +) + +const MainBranch = "main" + +type RefType string + +const ( + BranchRef RefType = "branch" + TagRef RefType = "tag" +) + +var ( + ErrInvalidRefType = errors.New("invalid snapshot ref type, should be 'branch' or 'tag'") +) + +type SnapshotRef struct { + SnapshotID int64 `json:"snapshot-id"` + SnapshotRefType RefType `json:"type"` + MinSnapshotsToKeep *int `json:"min-snapshots-to-keep,omitempty"` + MaxSnapshotAgeMs *int `json:"max-snapshot-age-ms,omitempty"` + MaxRefAgeMs *int `json:"max-ref-age-ms,omitempty"` +} + +func (s *SnapshotRef) UnmarshalJSON(b []byte) error { + type Alias SnapshotRef + aux := (*Alias)(s) + + if err := json.Unmarshal(b, aux); err != nil { + return nil + } + + switch s.SnapshotRefType { + case BranchRef, TagRef: + default: + return ErrInvalidRefType + } + + return nil +} diff --git a/table/refs_test.go b/table/refs_test.go new file mode 100644 index 0000000..bb61b1d --- /dev/null +++ b/table/refs_test.go @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table_test + +import ( + "encoding/json" + "testing" + + "github.com/apache/iceberg-go/table" + "github.com/stretchr/testify/assert" +) + +func TestInvalidSnapshotRefType(t *testing.T) { + ref := `{ + "snapshot-id": 3051729675574597004, + "type": "foobar" + }` + + var snapRef table.SnapshotRef + err := json.Unmarshal([]byte(ref), &snapRef) + assert.ErrorIs(t, err, table.ErrInvalidRefType) +} diff --git a/table/snapshots.go b/table/snapshots.go new file mode 100644 index 0000000..9897739 --- /dev/null +++ b/table/snapshots.go @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "encoding/json" + "errors" + "fmt" + "strconv" + + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/io" + "golang.org/x/exp/maps" +) + +type Operation string + +const ( + OpAppend Operation = "append" + OpReplace Operation = "replace" + OpOverwrite Operation = "overwrite" + OpDelete Operation = "delete" +) + +var ( + ErrInvalidOperation = errors.New("invalid operation value") + ErrMissingOperation = errors.New("missing operation key") +) + +func ValidOperation(s string) (Operation, error) { + switch s { + case "append", "replace", "overwrite", "delete": + return Operation(s), nil + } + return "", fmt.Errorf("%w: found '%s'", ErrInvalidOperation, s) +} + +const operationKey = "operation" + +type Summary struct { + Operation Operation + Properties map[string]string +} + +func (s *Summary) Equals(other *Summary) bool { + if s == other { + return true + } + + if s != nil && other == nil { + return false + } + + if s.Operation != other.Operation { + return false + } + + if len(s.Properties) == 0 && len(other.Properties) == 0 { + return true + } + + return maps.Equal(s.Properties, other.Properties) +} + +func (s *Summary) UnmarshalJSON(b []byte) (err error) { + alias := map[string]string{} + if err = json.Unmarshal(b, &alias); err != nil { + return + } + + op, ok := alias[operationKey] + if !ok { + return ErrMissingOperation + } + + if s.Operation, err = ValidOperation(op); err != nil { + return + } + + delete(alias, operationKey) + s.Properties = alias + return nil +} + +func (s *Summary) MarshalJSON() ([]byte, error) { + props := maps.Clone(s.Properties) + if s.Operation != "" { + if props == nil { + props = make(map[string]string) + } + props[operationKey] = string(s.Operation) + } + + return json.Marshal(props) +} + +type Snapshot struct { + SnapshotID int64 `json:"snapshot-id"` + ParentSnapshotID *int64 `json:"parent-snapshot-id,omitempty"` + SequenceNumber int `json:"sequence-number"` + TimestampMs int `json:"timestamp-ms"` + ManifestList string `json:"manifest-list,omitempty"` + Summary *Summary `json:"summary,omitempty"` + SchemaID *int `json:"schema-id,omitempty"` +} + +func (s Snapshot) String() string { + var ( + op, parent, schema string + ) + + if s.Summary != nil { + op = string(s.Summary.Operation) + ": " + } + if s.ParentSnapshotID != nil { + parent = ", parent_id=" + strconv.FormatInt(*s.ParentSnapshotID, 10) + } + if s.SchemaID != nil { + schema = ", schema_id=" + strconv.Itoa(*s.SchemaID) + } + return fmt.Sprintf("%sid=%d%s%s", op, s.SnapshotID, parent, schema) +} + +func (s Snapshot) Equals(other Snapshot) bool { + switch { + case s.ParentSnapshotID == nil && other.ParentSnapshotID != nil: + fallthrough + case s.ParentSnapshotID != nil && other.ParentSnapshotID == nil: + fallthrough + case s.SchemaID == nil && other.SchemaID != nil: + fallthrough + case s.SchemaID != nil && other.SchemaID == nil: + return false + } + + return s.SnapshotID == other.SnapshotID && + ((s.ParentSnapshotID == other.ParentSnapshotID) || (*s.ParentSnapshotID == *other.ParentSnapshotID)) && + ((s.SchemaID == other.SchemaID) || (*s.SchemaID == *other.SchemaID)) && + s.SequenceNumber == other.SequenceNumber && + s.TimestampMs == other.TimestampMs && + s.ManifestList == other.ManifestList && + s.Summary.Equals(other.Summary) +} + +func (s Snapshot) Manifests(fio io.IO) ([]iceberg.ManifestFile, error) { + if s.ManifestList != "" { + f, err := fio.Open(s.ManifestList) + if err != nil { + return nil, fmt.Errorf("could not open manifest file: %w", err) + } + defer f.Close() + return iceberg.ReadManifestList(f) + } + + return nil, nil +} + +type MetadataLogEntry struct { + MetadataFile string `json:"metadata-file"` + TimestampMs int `json:"timestamp-ms"` +} + +type SnapshotLogEntry struct { + SnapshotID int64 `json:"snapshot-id"` + TimestampMs int `json:"timestamp-ms"` +} diff --git a/table/snapshots_test.go b/table/snapshots_test.go new file mode 100644 index 0000000..8be7baa --- /dev/null +++ b/table/snapshots_test.go @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table_test + +import ( + "encoding/json" + "testing" + + "github.com/apache/iceberg-go/table" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func testSnapshot() table.Snapshot { + parentID := int64(19) + manifest, schemaid := "s3:/a/b/c.avro", 3 + return table.Snapshot{ + SnapshotID: 25, + ParentSnapshotID: &parentID, + SequenceNumber: 200, + TimestampMs: 1602638573590, + ManifestList: manifest, + SchemaID: &schemaid, + Summary: &table.Summary{ + Operation: table.OpAppend, + }, + } +} + +func testSnapshotWithProperties() table.Snapshot { + parentID := int64(19) + manifest, schemaid := "s3:/a/b/c.avro", 3 + return table.Snapshot{ + SnapshotID: 25, + ParentSnapshotID: &parentID, + SequenceNumber: 200, + TimestampMs: 1602638573590, + ManifestList: manifest, + SchemaID: &schemaid, + Summary: &table.Summary{ + Operation: table.OpAppend, + Properties: map[string]string{"foo": "bar"}, + }, + } +} + +func TestSerializeSnapshot(t *testing.T) { + snapshot := testSnapshot() + data, err := json.Marshal(snapshot) + require.NoError(t, err) + + assert.JSONEq(t, `{ + "snapshot-id": 25, + "parent-snapshot-id": 19, + "sequence-number": 200, + "timestamp-ms": 1602638573590, + "manifest-list": "s3:/a/b/c.avro", + "summary": {"operation": "append"}, + "schema-id": 3 + }`, string(data)) +} + +func TestSerializeSnapshotWithProps(t *testing.T) { + snapshot := testSnapshotWithProperties() + data, err := json.Marshal(snapshot) + require.NoError(t, err) + + assert.JSONEq(t, `{ + "snapshot-id": 25, + "parent-snapshot-id": 19, + "sequence-number": 200, + "timestamp-ms": 1602638573590, + "manifest-list": "s3:/a/b/c.avro", + "summary": {"operation": "append", "foo": "bar"}, + "schema-id": 3 + }`, string(data)) +} + +func TestMissingOperation(t *testing.T) { + var summary table.Summary + err := json.Unmarshal([]byte(`{"foo": "bar"}`), &summary) + assert.ErrorIs(t, err, table.ErrMissingOperation) +} + +func TestInvalidOperation(t *testing.T) { + var summary table.Summary + err := json.Unmarshal([]byte(`{"operation": "foobar"}`), &summary) + assert.ErrorIs(t, err, table.ErrInvalidOperation) + assert.ErrorContains(t, err, "found 'foobar'") +} diff --git a/table/sorting.go b/table/sorting.go new file mode 100644 index 0000000..d4bfd8b --- /dev/null +++ b/table/sorting.go @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "encoding/json" + "errors" + "fmt" + "strings" + + "github.com/apache/iceberg-go" +) + +type SortDirection string + +const ( + SortASC SortDirection = "asc" + SortDESC SortDirection = "desc" +) + +type NullOrder string + +const ( + NullsFirst NullOrder = "nulls-first" + NullsLast NullOrder = "nulls-last" +) + +var ( + ErrInvalidSortDirection = errors.New("invalid sort direction, must be 'asc' or 'desc'") + ErrInvalidNullOrder = errors.New("invalid null order, must be 'nulls-first' or 'nulls-last'") +) + +type SortField struct { + SourceID int `json:"source-id"` + Transform iceberg.Transform `json:"transform"` + Direction SortDirection `json:"direction"` + NullOrder NullOrder `json:"null-order"` +} + +func (s *SortField) String() string { + if _, ok := s.Transform.(iceberg.IdentityTransform); ok { + return fmt.Sprintf("%d %s %s", s.SourceID, s.Direction, s.NullOrder) + } + return fmt.Sprintf("%s(%d) %s %s", s.Transform, s.SourceID, s.Direction, s.NullOrder) +} + +func (s *SortField) MarshalJSON() ([]byte, error) { + if s.Direction == "" { + s.Direction = SortASC + } + + if s.NullOrder == "" { + if s.Direction == SortASC { + s.NullOrder = NullsFirst + } else { + s.NullOrder = NullsLast + } + } + + type Alias SortField + return json.Marshal((*Alias)(s)) +} + +func (s *SortField) UnmarshalJSON(b []byte) error { + type Alias SortField + var aux = struct { + TransformString string `json:"transform"` + *Alias + }{ + Alias: (*Alias)(s), + } + + err := json.Unmarshal(b, &aux) + if err != nil { + return err + } + + if s.Transform, err = iceberg.ParseTransform(aux.TransformString); err != nil { + return err + } + + switch s.Direction { + case SortASC, SortDESC: + default: + return ErrInvalidSortDirection + } + + switch s.NullOrder { + case NullsFirst, NullsLast: + default: + return ErrInvalidNullOrder + } + + return nil +} + +const ( + InitialSortOrderID = 1 + UnsortedSortOrderID = 0 +) + +var UnsortedSortOrder = SortOrder{OrderID: UnsortedSortOrderID, Fields: []SortField{}} + +type SortOrder struct { + OrderID int `json:"order-id"` + Fields []SortField `json:"fields"` +} + +func (s SortOrder) String() string { + var b strings.Builder + fmt.Fprintf(&b, "%d: ", s.OrderID) + b.WriteByte('[') + for i, f := range s.Fields { + if i == 0 { + b.WriteByte('\n') + } + b.WriteString(f.String()) + b.WriteByte('\n') + } + b.WriteByte(']') + return b.String() +} + +func (s *SortOrder) UnmarshalJSON(b []byte) error { + type Alias SortOrder + aux := (*Alias)(s) + + if err := json.Unmarshal(b, aux); err != nil { + return err + } + + if len(s.Fields) == 0 { + s.Fields = []SortField{} + s.OrderID = 0 + return nil + } + + if s.OrderID == 0 { + s.OrderID = InitialSortOrderID // initialize default sort order id + } + + return nil +} diff --git a/table/sorting_test.go b/table/sorting_test.go new file mode 100644 index 0000000..c12c8ff --- /dev/null +++ b/table/sorting_test.go @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table_test + +import ( + "encoding/json" + "testing" + + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/table" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var sortOrder = table.SortOrder{ + OrderID: 22, + Fields: []table.SortField{ + {SourceID: 19, Transform: iceberg.IdentityTransform{}, NullOrder: table.NullsFirst}, + {SourceID: 25, Transform: iceberg.BucketTransform{NumBuckets: 4}, Direction: table.SortDESC}, + {SourceID: 22, Transform: iceberg.VoidTransform{}, Direction: table.SortASC}, + }, +} + +func TestSerializeUnsortedSortOrder(t *testing.T) { + data, err := json.Marshal(table.UnsortedSortOrder) + require.NoError(t, err) + assert.JSONEq(t, `{"order-id": 0, "fields": []}`, string(data)) +} + +func TestSerializeSortOrder(t *testing.T) { + data, err := json.Marshal(sortOrder) + require.NoError(t, err) + assert.JSONEq(t, `{ + "order-id": 22, + "fields": [ + {"source-id": 19, "transform": "identity", "direction": "asc", "null-order": "nulls-first"}, + {"source-id": 25, "transform": "bucket[4]", "direction": "desc", "null-order": "nulls-last"}, + {"source-id": 22, "transform": "void", "direction": "asc", "null-order": "nulls-first"} + ] + }`, string(data)) +} + +func TestUnmarshalSortOrderDefaults(t *testing.T) { + var order table.SortOrder + require.NoError(t, json.Unmarshal([]byte(`{"fields": []}`), &order)) + assert.Equal(t, table.UnsortedSortOrder, order) + + require.NoError(t, json.Unmarshal([]byte(`{"fields": [{"source-id": 19, "transform": "identity", "direction": "asc", "null-order": "nulls-first"}]}`), &order)) + assert.Equal(t, table.InitialSortOrderID, order.OrderID) +} + +func TestUnmarshalInvalidSortDirection(t *testing.T) { + badJson := `{ + "order-id": 22, + "fields": [ + {"source-id": 19, "transform": "identity", "direction": "foobar", "null-order": "nulls-first"}, + {"source-id": 25, "transform": "bucket[4]", "direction": "desc", "null-order": "nulls-last"}, + {"source-id": 22, "transform": "void", "direction": "asc", "null-order": "nulls-first"} + ] + }` + + var order table.SortOrder + err := json.Unmarshal([]byte(badJson), &order) + assert.ErrorIs(t, err, table.ErrInvalidSortDirection) +} + +func TestUnmarshalInvalidSortNullOrder(t *testing.T) { + badJson := `{ + "order-id": 22, + "fields": [ + {"source-id": 19, "transform": "identity", "direction": "asc", "null-order": "foobar"}, + {"source-id": 25, "transform": "bucket[4]", "direction": "desc", "null-order": "nulls-last"}, + {"source-id": 22, "transform": "void", "direction": "asc", "null-order": "nulls-first"} + ] + }` + + var order table.SortOrder + err := json.Unmarshal([]byte(badJson), &order) + assert.ErrorIs(t, err, table.ErrInvalidNullOrder) +} + +func TestUnmarshalInvalidSortTransform(t *testing.T) { + badJson := `{ + "order-id": 22, + "fields": [ + {"source-id": 19, "transform": "foobar", "direction": "asc", "null-order": "nulls-first"}, + {"source-id": 25, "transform": "bucket[4]", "direction": "desc", "null-order": "nulls-last"}, + {"source-id": 22, "transform": "void", "direction": "asc", "null-order": "nulls-first"} + ] + }` + + var order table.SortOrder + err := json.Unmarshal([]byte(badJson), &order) + assert.ErrorIs(t, err, iceberg.ErrInvalidTransform) +} diff --git a/table/table.go b/table/table.go new file mode 100644 index 0000000..be8b3ec --- /dev/null +++ b/table/table.go @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "reflect" + + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/io" + "golang.org/x/exp/slices" +) + +type Identifier = []string + +type Table struct { + identifier Identifier + metadata Metadata + metadataLocation string + fs io.IO +} + +func (t Table) Equals(other Table) bool { + return slices.Equal(t.identifier, other.identifier) && + t.metadataLocation == other.metadataLocation && + reflect.DeepEqual(t.metadata, other.metadata) +} + +func (t Table) Identifier() Identifier { return t.identifier } +func (t Table) Metadata() Metadata { return t.metadata } +func (t Table) MetadataLoc() string { return t.metadataLocation } +func (t Table) FS() io.IO { return t.fs } + +func (t Table) Schema() *iceberg.Schema { return t.metadata.CurrentSchema() } +func (t Table) Spec() iceberg.PartitionSpec { return t.metadata.PartitionSpec() } +func (t Table) SortOrder() SortOrder { return t.metadata.SortOrder() } +func (t Table) Properties() iceberg.Properties { return t.metadata.Properties() } +func (t Table) Location() string { return t.metadata.Loc() } +func (t Table) CurrentSnapshot() *Snapshot { return t.metadata.CurrentSnapshot() } +func (t Table) SnapshotByID(id int64) *Snapshot { return t.metadata.SnapshotByID(id) } +func (t Table) SnapshotByName(name string) *Snapshot { return t.metadata.SnapshotByName(name) } +func (t Table) Schemas() map[int]*iceberg.Schema { + m := make(map[int]*iceberg.Schema) + for _, s := range t.metadata.Schemas() { + m[s.ID] = s + } + return m +} + +func New(ident Identifier, meta Metadata, location string, fs io.IO) *Table { + return &Table{ + identifier: ident, + metadata: meta, + metadataLocation: location, + fs: fs, + } +} + +func NewFromLocation(ident Identifier, metalocation string, fsys io.IO) (*Table, error) { + var meta Metadata + + if rf, ok := fsys.(io.ReadFileIO); ok { + data, err := rf.ReadFile(metalocation) + if err != nil { + return nil, err + } + + if meta, err = ParseMetadataBytes(data); err != nil { + return nil, err + } + } else { + f, err := fsys.Open(metalocation) + if err != nil { + return nil, err + } + defer f.Close() + + if meta, err = ParseMetadata(f); err != nil { + return nil, err + } + } + return New(ident, meta, metalocation, fsys), nil +} diff --git a/table/table_test.go b/table/table_test.go new file mode 100644 index 0000000..429fe0c --- /dev/null +++ b/table/table_test.go @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table_test + +import ( + "bytes" + "testing" + + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/internal" + "github.com/apache/iceberg-go/table" + "github.com/stretchr/testify/suite" +) + +type TableTestSuite struct { + suite.Suite + + tbl *table.Table +} + +func TestTable(t *testing.T) { + suite.Run(t, new(TableTestSuite)) +} + +func (t *TableTestSuite) SetupSuite() { + var mockfs internal.MockFS + mockfs.Test(t.T()) + mockfs.On("Open", "s3://bucket/test/location/uuid.metadata.json"). + Return(&internal.MockFile{Contents: bytes.NewReader([]byte(ExampleTableMetadataV2))}, nil) + defer mockfs.AssertExpectations(t.T()) + + tbl, err := table.NewFromLocation([]string{"foo"}, "s3://bucket/test/location/uuid.metadata.json", &mockfs) + t.Require().NoError(err) + t.Require().NotNil(tbl) + + t.Equal([]string{"foo"}, tbl.Identifier()) + t.Equal("s3://bucket/test/location/uuid.metadata.json", tbl.MetadataLoc()) + t.Equal(&mockfs, tbl.FS()) + + t.tbl = tbl +} + +func (t *TableTestSuite) TestNewTableFromReadFile() { + var mockfsReadFile internal.MockFSReadFile + mockfsReadFile.Test(t.T()) + mockfsReadFile.On("ReadFile", "s3://bucket/test/location/uuid.metadata.json"). + Return([]byte(ExampleTableMetadataV2), nil) + defer mockfsReadFile.AssertExpectations(t.T()) + + tbl2, err := table.NewFromLocation([]string{"foo"}, "s3://bucket/test/location/uuid.metadata.json", &mockfsReadFile) + t.Require().NoError(err) + t.Require().NotNil(tbl2) + + t.True(t.tbl.Equals(*tbl2)) +} + +func (t *TableTestSuite) TestSchema() { + t.True(t.tbl.Schema().Equals(iceberg.NewSchemaWithIdentifiers(1, []int{1, 2}, + iceberg.NestedField{ID: 1, Name: "x", Type: iceberg.PrimitiveTypes.Int64, Required: true}, + iceberg.NestedField{ID: 2, Name: "y", Type: iceberg.PrimitiveTypes.Int64, Required: true, Doc: "comment"}, + iceberg.NestedField{ID: 3, Name: "z", Type: iceberg.PrimitiveTypes.Int64, Required: true}, + ))) +} + +func (t *TableTestSuite) TestPartitionSpec() { + t.Equal(iceberg.NewPartitionSpec( + iceberg.PartitionField{SourceID: 1, FieldID: 1000, Transform: iceberg.IdentityTransform{}, Name: "x"}, + ), t.tbl.Spec()) +} + +func (t *TableTestSuite) TestSortOrder() { + t.Equal(table.SortOrder{ + OrderID: 3, + Fields: []table.SortField{ + {SourceID: 2, Transform: iceberg.IdentityTransform{}, Direction: table.SortASC, NullOrder: table.NullsFirst}, + {SourceID: 3, Transform: iceberg.BucketTransform{NumBuckets: 4}, Direction: table.SortDESC, NullOrder: table.NullsLast}, + }, + }, t.tbl.SortOrder()) +} + +func (t *TableTestSuite) TestLocation() { + t.Equal("s3://bucket/test/location", t.tbl.Location()) +} + +func (t *TableTestSuite) TestSnapshot() { + var ( + parentSnapshotID int64 = 3051729675574597004 + one = 1 + manifestList = "s3://a/b/2.avro" + ) + + testSnapshot := table.Snapshot{ + SnapshotID: 3055729675574597004, + ParentSnapshotID: &parentSnapshotID, + SequenceNumber: 1, + TimestampMs: 1555100955770, + ManifestList: manifestList, + Summary: &table.Summary{Operation: table.OpAppend, Properties: map[string]string{}}, + SchemaID: &one, + } + t.True(testSnapshot.Equals(*t.tbl.CurrentSnapshot())) + + t.True(testSnapshot.Equals(*t.tbl.SnapshotByID(3055729675574597004))) +} + +func (t *TableTestSuite) TestSnapshotByName() { + testSnapshot := table.Snapshot{ + SnapshotID: 3051729675574597004, + TimestampMs: 1515100955770, + ManifestList: "s3://a/b/1.avro", + Summary: &table.Summary{Operation: table.OpAppend}, + } + + t.True(testSnapshot.Equals(*t.tbl.SnapshotByName("test"))) +} From f84c8266203b9f22a15c3209890bb3ab59a91e5f Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Tue, 26 Sep 2023 16:21:53 -0400 Subject: [PATCH 2/6] update go.mod --- go.mod | 3 ++- go.sum | 4 ++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 5b05903..827297b 100644 --- a/go.mod +++ b/go.mod @@ -24,10 +24,11 @@ require ( github.com/aws/aws-sdk-go-v2/config v1.18.44 github.com/aws/aws-sdk-go-v2/credentials v1.13.42 github.com/aws/aws-sdk-go-v2/service/s3 v1.40.1 + github.com/google/uuid v1.3.1 github.com/hamba/avro/v2 v2.16.0 github.com/stretchr/testify v1.8.4 github.com/wolfeidau/s3iofs v1.3.0 - golang.org/x/exp v0.0.0-20230811145659-89c5cff77bcb + golang.org/x/exp v0.0.0-20231006140011-7918f672742d ) require ( diff --git a/go.sum b/go.sum index 4b892ec..fe1e9a8 100644 --- a/go.sum +++ b/go.sum @@ -42,6 +42,8 @@ github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= +github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hamba/avro/v2 v2.16.0 h1:0XhyP65Hs8iMLtdSR0v7ZrwRjsbIZdvr7KzYgmx1Mbo= github.com/hamba/avro/v2 v2.16.0/go.mod h1:Q9YK+qxAhtVrNqOhwlZTATLgLA8qxG2vtvkhK8fJ7Jo= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= @@ -70,6 +72,8 @@ github.com/wolfeidau/s3iofs v1.3.0 h1:O6lf4HCZVuHMjCdMYrpQIfKzLDYbO5BtNrH5RdQGv4 github.com/wolfeidau/s3iofs v1.3.0/go.mod h1:oWH749wXNok7pJ+5MK7mi5uyS7GqQzeGApEGwUSaIjc= golang.org/x/exp v0.0.0-20230811145659-89c5cff77bcb h1:mIKbk8weKhSeLH2GmUTrvx8CjkyJmnU1wFmg59CUjFA= golang.org/x/exp v0.0.0-20230811145659-89c5cff77bcb/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= +golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI= +golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo= golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8= golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= From 18e544eeb0e374753a5c74eef1f47d3e5789dc51 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Tue, 26 Sep 2023 16:53:04 -0400 Subject: [PATCH 3/6] adding comments --- table/metadata.go | 50 ++++++++++++++++++++++++++++++++++++++++++++++ table/refs.go | 2 ++ table/snapshots.go | 5 +++++ table/sorting.go | 19 +++++++++++++++--- 4 files changed, 73 insertions(+), 3 deletions(-) diff --git a/table/metadata.go b/table/metadata.go index 30fea19..3892d82 100644 --- a/table/metadata.go +++ b/table/metadata.go @@ -28,24 +28,65 @@ import ( "github.com/google/uuid" ) +// Metadata for an iceberg table as specified in the Iceberg spec +// +// https://iceberg.apache.org/spec/#iceberg-table-spec type Metadata interface { + // Version indicates the version of this metadata, 1 for V1, 2 for V2, etc. Version() int + // TableUUID returns a UUID that identifies the table, generated when the + // table is created. Implementations must throw an exception if a table's + // UUID does not match the expected UUID after refreshing metadata. TableUUID() uuid.UUID + // Loc is the table's base location. This is used by writers to determine + // where to store data files, manifest files, and table metadata files. Loc() string + // LastUpdated is the timestamp in milliseconds from the unix epoch when + // the table was last updated. Each table metadata file should update this + // field just before writing. LastUpdated() int + // LastColumn returns the highest assigned column ID for the table. + // This is used to ensure fields are always assigned an unused ID when + // evolving schemas. LastColumn() int + // Schemas returns the list of schemas, stored as objects with their + // schema-id. Schemas() []*iceberg.Schema + // CurrentSchema returns the table's current schema. CurrentSchema() *iceberg.Schema + // PartitionSpecs returns the list of all partition specs in the table. PartitionSpecs() []iceberg.PartitionSpec + // PartitionSpec returns the current partition spec that the table is using. PartitionSpec() iceberg.PartitionSpec + // DefaultPartitionSpec is the ID of the current spec that writers should + // use by default. DefaultPartitionSpec() int + // LastPartitionSpecID is the highest assigned partition field ID across + // all partition specs for the table. This is used to ensure partition + // fields are always assigned an unused ID when evolving specs. LastPartitionSpecID() *int + // Snapshots returns the list of valid snapshots. Valid snapshots are + // snapshots for which all data files exist in the file system. A data + // file must not be deleted from the file system until the last snapshot + // in which it was listed is garbage collected. Snapshots() []Snapshot + // SnapshotByID find and return a specific snapshot by its ID. Returns + // nil if the ID is not found in the list of snapshots. SnapshotByID(int64) *Snapshot + // SnapshotByName searches the list of snapshots for a snapshot with a given + // ref name. Returns nil if there's no ref with this name for a snapshot. SnapshotByName(name string) *Snapshot + // CurrentSnapshot returns the table's current snapshot. CurrentSnapshot() *Snapshot + // SortOrder returns the table's current sort order, ie: the one with the + // ID that matches the default-sort-order-id. SortOrder() SortOrder + // SortOrders returns the list of sort orders in the table. SortOrders() []SortOrder + // Properties is a string to string map of table properties. This is used + // to control settings that affect reading and writing and is not intended + // to be used for arbitrary metadata. For example, commit.retry.num-retries + // is used to control the number of commit retries. Properties() iceberg.Properties } @@ -54,6 +95,8 @@ var ( ErrInvalidMetadata = errors.New("invalid metadata") ) +// ParseMetadata parses json metadata provided by the passed in reader, +// returning an error if one is encountered. func ParseMetadata(r io.Reader) (Metadata, error) { data, err := io.ReadAll(r) if err != nil { @@ -63,10 +106,13 @@ func ParseMetadata(r io.Reader) (Metadata, error) { return ParseMetadataBytes(data) } +// ParseMetadataString is like [ParseMetadata], but for a string rather than +// an io.Reader. func ParseMetadataString(s string) (Metadata, error) { return ParseMetadataBytes([]byte(s)) } +// ParseMetadataBytes is like [ParseMetadataString] but for a byte slice. func ParseMetadataBytes(b []byte) (Metadata, error) { ver := struct { FormatVersion int `json:"format-version"` @@ -180,6 +226,10 @@ func (c *commonMetadata) Properties() iceberg.Properties { return c.Props } +// preValidate updates values in the metadata struct with defaults based on +// combinations of struct members. Such as initializing slices as empty slices +// if they were null in the metadata, or normalizing inconsistencies between +// metadata versions. func (c *commonMetadata) preValidate() { if c.CurrentSnapshotID != nil && *c.CurrentSnapshotID == -1 { // treat -1 as the same as nil, clean this up in pre-validation diff --git a/table/refs.go b/table/refs.go index e228c7e..715bae9 100644 --- a/table/refs.go +++ b/table/refs.go @@ -24,6 +24,7 @@ import ( const MainBranch = "main" +// RefType will be either a BranchRef or a TagRef type RefType string const ( @@ -35,6 +36,7 @@ var ( ErrInvalidRefType = errors.New("invalid snapshot ref type, should be 'branch' or 'tag'") ) +// SnapshotRef represents the reference information for a specific snapshot type SnapshotRef struct { SnapshotID int64 `json:"snapshot-id"` SnapshotRefType RefType `json:"type"` diff --git a/table/snapshots.go b/table/snapshots.go index 9897739..b60d3e5 100644 --- a/table/snapshots.go +++ b/table/snapshots.go @@ -42,6 +42,8 @@ var ( ErrMissingOperation = errors.New("missing operation key") ) +// ValidOperation ensures that a given string is one of the valid operation +// types: append,replace,overwrite,delete func ValidOperation(s string) (Operation, error) { switch s { case "append", "replace", "overwrite", "delete": @@ -52,6 +54,9 @@ func ValidOperation(s string) (Operation, error) { const operationKey = "operation" +// Summary stores the summary information for a snapshot indicating +// the operation that created the snapshot, and various properties +// which might exist in the summary. type Summary struct { Operation Operation Properties map[string]string diff --git a/table/sorting.go b/table/sorting.go index d4bfd8b..89bc76c 100644 --- a/table/sorting.go +++ b/table/sorting.go @@ -45,11 +45,18 @@ var ( ErrInvalidNullOrder = errors.New("invalid null order, must be 'nulls-first' or 'nulls-last'") ) +// SortField describes a field used in a sort order definition. type SortField struct { - SourceID int `json:"source-id"` + // SourceID is the source column id from the table's schema + SourceID int `json:"source-id"` + // Transform is the tranformation used to produce values to be + // sorted on from the source column. Transform iceberg.Transform `json:"transform"` - Direction SortDirection `json:"direction"` - NullOrder NullOrder `json:"null-order"` + // Direction is an enum indicating ascending or descending direction. + Direction SortDirection `json:"direction"` + // NullOrder describes the order of null values when sorting + // should be only either nulls-first or nulls-last enum values. + NullOrder NullOrder `json:"null-order"` } func (s *SortField) String() string { @@ -114,8 +121,14 @@ const ( UnsortedSortOrderID = 0 ) +// A default Sort Order indicating no sort order at all var UnsortedSortOrder = SortOrder{OrderID: UnsortedSortOrderID, Fields: []SortField{}} +// SortOrder describes how the data is sorted within the table. +// +// Data can be sorted within partitions by columns to gain performance. The +// order of the sort fields within the list defines the order in which the +// sort is applied to the data. type SortOrder struct { OrderID int `json:"order-id"` Fields []SortField `json:"fields"` From 6023e54f7170d1731f94c23a52d2deb8a945486b Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Fri, 6 Oct 2023 11:57:06 -0400 Subject: [PATCH 4/6] updates from feedback --- table/metadata.go | 16 ++++++++-------- table/metadata_test.go | 10 +++++----- table/snapshots.go | 8 ++++---- table/table.go | 10 +++++----- table/table_test.go | 2 +- 5 files changed, 23 insertions(+), 23 deletions(-) diff --git a/table/metadata.go b/table/metadata.go index 3892d82..893d062 100644 --- a/table/metadata.go +++ b/table/metadata.go @@ -38,13 +38,13 @@ type Metadata interface { // table is created. Implementations must throw an exception if a table's // UUID does not match the expected UUID after refreshing metadata. TableUUID() uuid.UUID - // Loc is the table's base location. This is used by writers to determine + // Location is the table's base location. This is used by writers to determine // where to store data files, manifest files, and table metadata files. - Loc() string - // LastUpdated is the timestamp in milliseconds from the unix epoch when + Location() string + // LastUpdatedMillis is the timestamp in milliseconds from the unix epoch when // the table was last updated. Each table metadata file should update this // field just before writing. - LastUpdated() int + LastUpdatedMillis() int64 // LastColumn returns the highest assigned column ID for the table. // This is used to ensure fields are always assigned an unused ID when // evolving schemas. @@ -138,8 +138,8 @@ func ParseMetadataBytes(b []byte) (Metadata, error) { type commonMetadata struct { FormatVersion int `json:"format-version"` UUID uuid.UUID `json:"table-uuid"` - Location string `json:"location"` - LastUpdatedMS int `json:"last-updated-ms"` + Loc string `json:"location"` + LastUpdatedMS int64 `json:"last-updated-ms"` LastColumnID int `json:"last-column-id"` SchemaList []*iceberg.Schema `json:"schemas"` CurrentSchemaID int `json:"current-schema-id"` @@ -157,8 +157,8 @@ type commonMetadata struct { } func (c *commonMetadata) TableUUID() uuid.UUID { return c.UUID } -func (c *commonMetadata) Loc() string { return c.Location } -func (c *commonMetadata) LastUpdated() int { return c.LastUpdatedMS } +func (c *commonMetadata) Location() string { return c.Loc } +func (c *commonMetadata) LastUpdatedMillis() int64 { return c.LastUpdatedMS } func (c *commonMetadata) LastColumn() int { return c.LastColumnID } func (c *commonMetadata) Schemas() []*iceberg.Schema { return c.SchemaList } func (c *commonMetadata) CurrentSchema() *iceberg.Schema { diff --git a/table/metadata_test.go b/table/metadata_test.go index 46f10f9..e8dac51 100644 --- a/table/metadata_test.go +++ b/table/metadata_test.go @@ -120,8 +120,8 @@ func TestMetadataV1Parsing(t *testing.T) { data := meta.(*table.MetadataV1) assert.Equal(t, uuid.MustParse("d20125c8-7284-442c-9aea-15fee620737c"), meta.TableUUID()) - assert.Equal(t, "s3://bucket/test/location", meta.Loc()) - assert.Equal(t, 1602638573874, meta.LastUpdated()) + assert.Equal(t, "s3://bucket/test/location", meta.Location()) + assert.Equal(t, int64(1602638573874), meta.LastUpdatedMillis()) assert.Equal(t, 3, meta.LastColumn()) expected := iceberg.NewSchema( @@ -166,9 +166,9 @@ func TestMetadataV2Parsing(t *testing.T) { data := meta.(*table.MetadataV2) assert.Equal(t, uuid.MustParse("9c12d441-03fe-4693-9a96-a0705ddf69c1"), data.UUID) - assert.Equal(t, "s3://bucket/test/location", data.Location) + assert.Equal(t, "s3://bucket/test/location", data.Location()) assert.Equal(t, 34, data.LastSequenceNumber) - assert.Equal(t, 1602638573590, data.LastUpdatedMS) + assert.Equal(t, int64(1602638573590), data.LastUpdatedMS) assert.Equal(t, 3, data.LastColumnID) assert.Equal(t, 0, data.SchemaList[0].ID) assert.Equal(t, 1, data.CurrentSchemaID) @@ -178,7 +178,7 @@ func TestMetadataV2Parsing(t *testing.T) { assert.EqualValues(t, "134217728", data.Props["read.split.target.size"]) assert.EqualValues(t, 3055729675574597004, *data.CurrentSnapshotID) assert.EqualValues(t, 3051729675574597004, data.SnapshotList[0].SnapshotID) - assert.Equal(t, 1515100955770, data.SnapshotLog[0].TimestampMs) + assert.Equal(t, int64(1515100955770), data.SnapshotLog[0].TimestampMs) assert.Equal(t, 3, data.SortOrderList[0].OrderID) assert.Equal(t, 3, data.DefaultSortOrderID) diff --git a/table/snapshots.go b/table/snapshots.go index b60d3e5..c539c84 100644 --- a/table/snapshots.go +++ b/table/snapshots.go @@ -117,8 +117,8 @@ func (s *Summary) MarshalJSON() ([]byte, error) { type Snapshot struct { SnapshotID int64 `json:"snapshot-id"` ParentSnapshotID *int64 `json:"parent-snapshot-id,omitempty"` - SequenceNumber int `json:"sequence-number"` - TimestampMs int `json:"timestamp-ms"` + SequenceNumber int64 `json:"sequence-number"` + TimestampMs int64 `json:"timestamp-ms"` ManifestList string `json:"manifest-list,omitempty"` Summary *Summary `json:"summary,omitempty"` SchemaID *int `json:"schema-id,omitempty"` @@ -177,10 +177,10 @@ func (s Snapshot) Manifests(fio io.IO) ([]iceberg.ManifestFile, error) { type MetadataLogEntry struct { MetadataFile string `json:"metadata-file"` - TimestampMs int `json:"timestamp-ms"` + TimestampMs int64 `json:"timestamp-ms"` } type SnapshotLogEntry struct { SnapshotID int64 `json:"snapshot-id"` - TimestampMs int `json:"timestamp-ms"` + TimestampMs int64 `json:"timestamp-ms"` } diff --git a/table/table.go b/table/table.go index be8b3ec..80b68b3 100644 --- a/table/table.go +++ b/table/table.go @@ -40,16 +40,16 @@ func (t Table) Equals(other Table) bool { reflect.DeepEqual(t.metadata, other.metadata) } -func (t Table) Identifier() Identifier { return t.identifier } -func (t Table) Metadata() Metadata { return t.metadata } -func (t Table) MetadataLoc() string { return t.metadataLocation } -func (t Table) FS() io.IO { return t.fs } +func (t Table) Identifier() Identifier { return t.identifier } +func (t Table) Metadata() Metadata { return t.metadata } +func (t Table) MetadataLocation() string { return t.metadataLocation } +func (t Table) FS() io.IO { return t.fs } func (t Table) Schema() *iceberg.Schema { return t.metadata.CurrentSchema() } func (t Table) Spec() iceberg.PartitionSpec { return t.metadata.PartitionSpec() } func (t Table) SortOrder() SortOrder { return t.metadata.SortOrder() } func (t Table) Properties() iceberg.Properties { return t.metadata.Properties() } -func (t Table) Location() string { return t.metadata.Loc() } +func (t Table) Location() string { return t.metadata.Location() } func (t Table) CurrentSnapshot() *Snapshot { return t.metadata.CurrentSnapshot() } func (t Table) SnapshotByID(id int64) *Snapshot { return t.metadata.SnapshotByID(id) } func (t Table) SnapshotByName(name string) *Snapshot { return t.metadata.SnapshotByName(name) } diff --git a/table/table_test.go b/table/table_test.go index 429fe0c..cde94ab 100644 --- a/table/table_test.go +++ b/table/table_test.go @@ -49,7 +49,7 @@ func (t *TableTestSuite) SetupSuite() { t.Require().NotNil(tbl) t.Equal([]string{"foo"}, tbl.Identifier()) - t.Equal("s3://bucket/test/location/uuid.metadata.json", tbl.MetadataLoc()) + t.Equal("s3://bucket/test/location/uuid.metadata.json", tbl.MetadataLocation()) t.Equal(&mockfs, tbl.FS()) t.tbl = tbl From a4e6037d52d1cbeea6d4c94f68738e0f9ec4879d Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Fri, 6 Oct 2023 13:36:34 -0400 Subject: [PATCH 5/6] some more tests --- table/metadata_test.go | 2 +- table/refs.go | 4 ++-- table/refs_test.go | 35 +++++++++++++++++++++++++++++++++++ table/snapshots.go | 14 ++++++++++++-- table/snapshots_test.go | 10 ++++++++++ 5 files changed, 60 insertions(+), 5 deletions(-) diff --git a/table/metadata_test.go b/table/metadata_test.go index e8dac51..6547baa 100644 --- a/table/metadata_test.go +++ b/table/metadata_test.go @@ -401,7 +401,7 @@ func TestV2RefCreation(t *testing.T) { var meta table.MetadataV2 require.NoError(t, json.Unmarshal([]byte(ExampleTableMetadataV2), &meta)) - maxRefAge := 10000000 + maxRefAge := int64(10000000) assert.Equal(t, map[string]table.SnapshotRef{ "main": { SnapshotID: 3055729675574597004, diff --git a/table/refs.go b/table/refs.go index 715bae9..cf63efc 100644 --- a/table/refs.go +++ b/table/refs.go @@ -41,8 +41,8 @@ type SnapshotRef struct { SnapshotID int64 `json:"snapshot-id"` SnapshotRefType RefType `json:"type"` MinSnapshotsToKeep *int `json:"min-snapshots-to-keep,omitempty"` - MaxSnapshotAgeMs *int `json:"max-snapshot-age-ms,omitempty"` - MaxRefAgeMs *int `json:"max-ref-age-ms,omitempty"` + MaxSnapshotAgeMs *int64 `json:"max-snapshot-age-ms,omitempty"` + MaxRefAgeMs *int64 `json:"max-ref-age-ms,omitempty"` } func (s *SnapshotRef) UnmarshalJSON(b []byte) error { diff --git a/table/refs_test.go b/table/refs_test.go index bb61b1d..d8b54e4 100644 --- a/table/refs_test.go +++ b/table/refs_test.go @@ -35,3 +35,38 @@ func TestInvalidSnapshotRefType(t *testing.T) { err := json.Unmarshal([]byte(ref), &snapRef) assert.ErrorIs(t, err, table.ErrInvalidRefType) } + +func TestSnapshotBranchRef(t *testing.T) { + ref := `{ + "snapshot-id": 3051729675574597004, + "type": "branch" + }` + + var snapRef table.SnapshotRef + err := json.Unmarshal([]byte(ref), &snapRef) + assert.NoError(t, err) + + assert.Equal(t, table.BranchRef, snapRef.SnapshotRefType) + assert.Equal(t, int64(3051729675574597004), snapRef.SnapshotID) + assert.Nil(t, snapRef.MinSnapshotsToKeep) + assert.Nil(t, snapRef.MaxRefAgeMs) + assert.Nil(t, snapRef.MaxSnapshotAgeMs) +} + +func TestSnapshotTagRef(t *testing.T) { + ref := `{ + "snapshot-id": 3051729675574597004, + "type": "tag", + "min-snapshots-to-keep": 10 + }` + + var snapRef table.SnapshotRef + err := json.Unmarshal([]byte(ref), &snapRef) + assert.NoError(t, err) + + assert.Equal(t, table.TagRef, snapRef.SnapshotRefType) + assert.Equal(t, int64(3051729675574597004), snapRef.SnapshotID) + assert.Equal(t, 10, *snapRef.MinSnapshotsToKeep) + assert.Nil(t, snapRef.MaxRefAgeMs) + assert.Nil(t, snapRef.MaxSnapshotAgeMs) +} diff --git a/table/snapshots.go b/table/snapshots.go index c539c84..26dc8d2 100644 --- a/table/snapshots.go +++ b/table/snapshots.go @@ -62,6 +62,15 @@ type Summary struct { Properties map[string]string } +func (s *Summary) String() string { + out := string(s.Operation) + if s.Properties != nil { + data, _ := json.Marshal(s.Properties) + out += ", " + string(data) + } + return out +} + func (s *Summary) Equals(other *Summary) bool { if s == other { return true @@ -130,7 +139,7 @@ func (s Snapshot) String() string { ) if s.Summary != nil { - op = string(s.Summary.Operation) + ": " + op = s.Summary.String() + ": " } if s.ParentSnapshotID != nil { parent = ", parent_id=" + strconv.FormatInt(*s.ParentSnapshotID, 10) @@ -138,7 +147,8 @@ func (s Snapshot) String() string { if s.SchemaID != nil { schema = ", schema_id=" + strconv.Itoa(*s.SchemaID) } - return fmt.Sprintf("%sid=%d%s%s", op, s.SnapshotID, parent, schema) + return fmt.Sprintf("%sid=%d%s%s, sequence_number=%d, timestamp_ms=%d, manifest_list=%s", + op, s.SnapshotID, parent, schema, s.SequenceNumber, s.TimestampMs, s.ManifestList) } func (s Snapshot) Equals(other Snapshot) bool { diff --git a/table/snapshots_test.go b/table/snapshots_test.go index 8be7baa..f79f977 100644 --- a/table/snapshots_test.go +++ b/table/snapshots_test.go @@ -103,3 +103,13 @@ func TestInvalidOperation(t *testing.T) { assert.ErrorIs(t, err, table.ErrInvalidOperation) assert.ErrorContains(t, err, "found 'foobar'") } + +func TestSnapshotString(t *testing.T) { + snapshot := testSnapshot() + assert.Equal(t, `append: id=25, parent_id=19, schema_id=3, sequence_number=200, timestamp_ms=1602638573590, manifest_list=s3:/a/b/c.avro`, + snapshot.String()) + + snapshot = testSnapshotWithProperties() + assert.Equal(t, `append, {"foo":"bar"}: id=25, parent_id=19, schema_id=3, sequence_number=200, timestamp_ms=1602638573590, manifest_list=s3:/a/b/c.avro`, + snapshot.String()) +} From 5cf8a8fb3d0a8f9801e41f103fcfde338ae3ff57 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Wed, 11 Oct 2023 11:16:59 -0400 Subject: [PATCH 6/6] some updates from comments --- go.sum | 2 -- table/metadata.go | 10 +++++----- table/metadata_test.go | 4 ++-- table/snapshots_test.go | 12 ++++++------ 4 files changed, 13 insertions(+), 15 deletions(-) diff --git a/go.sum b/go.sum index fe1e9a8..6e1fc0f 100644 --- a/go.sum +++ b/go.sum @@ -70,8 +70,6 @@ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcU github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/wolfeidau/s3iofs v1.3.0 h1:O6lf4HCZVuHMjCdMYrpQIfKzLDYbO5BtNrH5RdQGv4c= github.com/wolfeidau/s3iofs v1.3.0/go.mod h1:oWH749wXNok7pJ+5MK7mi5uyS7GqQzeGApEGwUSaIjc= -golang.org/x/exp v0.0.0-20230811145659-89c5cff77bcb h1:mIKbk8weKhSeLH2GmUTrvx8CjkyJmnU1wFmg59CUjFA= -golang.org/x/exp v0.0.0-20230811145659-89c5cff77bcb/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI= golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo= golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8= diff --git a/table/metadata.go b/table/metadata.go index 893d062..957e163 100644 --- a/table/metadata.go +++ b/table/metadata.go @@ -45,10 +45,10 @@ type Metadata interface { // the table was last updated. Each table metadata file should update this // field just before writing. LastUpdatedMillis() int64 - // LastColumn returns the highest assigned column ID for the table. + // LastColumnID returns the highest assigned column ID for the table. // This is used to ensure fields are always assigned an unused ID when // evolving schemas. - LastColumn() int + LastColumnID() int // Schemas returns the list of schemas, stored as objects with their // schema-id. Schemas() []*iceberg.Schema @@ -140,7 +140,7 @@ type commonMetadata struct { UUID uuid.UUID `json:"table-uuid"` Loc string `json:"location"` LastUpdatedMS int64 `json:"last-updated-ms"` - LastColumnID int `json:"last-column-id"` + LastColumnId int `json:"last-column-id"` SchemaList []*iceberg.Schema `json:"schemas"` CurrentSchemaID int `json:"current-schema-id"` Specs []iceberg.PartitionSpec `json:"partition-specs"` @@ -159,7 +159,7 @@ type commonMetadata struct { func (c *commonMetadata) TableUUID() uuid.UUID { return c.UUID } func (c *commonMetadata) Location() string { return c.Loc } func (c *commonMetadata) LastUpdatedMillis() int64 { return c.LastUpdatedMS } -func (c *commonMetadata) LastColumn() int { return c.LastColumnID } +func (c *commonMetadata) LastColumnID() int { return c.LastColumnId } func (c *commonMetadata) Schemas() []*iceberg.Schema { return c.SchemaList } func (c *commonMetadata) CurrentSchema() *iceberg.Schema { for _, s := range c.SchemaList { @@ -314,7 +314,7 @@ func (c *commonMetadata) validate() error { case c.LastUpdatedMS == 0: // last-updated-ms is required return fmt.Errorf("%w: missing last-updated-ms", ErrInvalidMetadata) - case c.LastColumnID == 0: + case c.LastColumnId == 0: // last-column-id is required return fmt.Errorf("%w: missing last-column-id", ErrInvalidMetadata) } diff --git a/table/metadata_test.go b/table/metadata_test.go index 6547baa..080688f 100644 --- a/table/metadata_test.go +++ b/table/metadata_test.go @@ -122,7 +122,7 @@ func TestMetadataV1Parsing(t *testing.T) { assert.Equal(t, uuid.MustParse("d20125c8-7284-442c-9aea-15fee620737c"), meta.TableUUID()) assert.Equal(t, "s3://bucket/test/location", meta.Location()) assert.Equal(t, int64(1602638573874), meta.LastUpdatedMillis()) - assert.Equal(t, 3, meta.LastColumn()) + assert.Equal(t, 3, meta.LastColumnID()) expected := iceberg.NewSchema( 0, @@ -169,7 +169,7 @@ func TestMetadataV2Parsing(t *testing.T) { assert.Equal(t, "s3://bucket/test/location", data.Location()) assert.Equal(t, 34, data.LastSequenceNumber) assert.Equal(t, int64(1602638573590), data.LastUpdatedMS) - assert.Equal(t, 3, data.LastColumnID) + assert.Equal(t, 3, data.LastColumnId) assert.Equal(t, 0, data.SchemaList[0].ID) assert.Equal(t, 1, data.CurrentSchemaID) assert.Equal(t, 0, data.Specs[0].ID()) diff --git a/table/snapshots_test.go b/table/snapshots_test.go index f79f977..6a101a5 100644 --- a/table/snapshots_test.go +++ b/table/snapshots_test.go @@ -26,7 +26,7 @@ import ( "github.com/stretchr/testify/require" ) -func testSnapshot() table.Snapshot { +func Snapshot() table.Snapshot { parentID := int64(19) manifest, schemaid := "s3:/a/b/c.avro", 3 return table.Snapshot{ @@ -42,7 +42,7 @@ func testSnapshot() table.Snapshot { } } -func testSnapshotWithProperties() table.Snapshot { +func SnapshotWithProperties() table.Snapshot { parentID := int64(19) manifest, schemaid := "s3:/a/b/c.avro", 3 return table.Snapshot{ @@ -60,7 +60,7 @@ func testSnapshotWithProperties() table.Snapshot { } func TestSerializeSnapshot(t *testing.T) { - snapshot := testSnapshot() + snapshot := Snapshot() data, err := json.Marshal(snapshot) require.NoError(t, err) @@ -76,7 +76,7 @@ func TestSerializeSnapshot(t *testing.T) { } func TestSerializeSnapshotWithProps(t *testing.T) { - snapshot := testSnapshotWithProperties() + snapshot := SnapshotWithProperties() data, err := json.Marshal(snapshot) require.NoError(t, err) @@ -105,11 +105,11 @@ func TestInvalidOperation(t *testing.T) { } func TestSnapshotString(t *testing.T) { - snapshot := testSnapshot() + snapshot := Snapshot() assert.Equal(t, `append: id=25, parent_id=19, schema_id=3, sequence_number=200, timestamp_ms=1602638573590, manifest_list=s3:/a/b/c.avro`, snapshot.String()) - snapshot = testSnapshotWithProperties() + snapshot = SnapshotWithProperties() assert.Equal(t, `append, {"foo":"bar"}: id=25, parent_id=19, schema_id=3, sequence_number=200, timestamp_ms=1602638573590, manifest_list=s3:/a/b/c.avro`, snapshot.String()) }