diff --git a/catalog/catalog.go b/catalog/catalog.go index d6d7f1e..14c30f3 100644 --- a/catalog/catalog.go +++ b/catalog/catalog.go @@ -43,6 +43,7 @@ var ( ErrNoSuchTable = errors.New("table does not exist") ErrNoSuchNamespace = errors.New("namespace does not exist") ErrNamespaceAlreadyExists = errors.New("namespace already exists") + ErrTableAlreadyExists = errors.New("table already exists") ) // WithAwsConfig sets the AWS configuration for the catalog. @@ -147,8 +148,9 @@ type Catalog interface { ListTables(ctx context.Context, namespace table.Identifier) ([]table.Identifier, error) // LoadTable loads a table from the catalog and returns a Table with the metadata. LoadTable(ctx context.Context, identifier table.Identifier, props iceberg.Properties) (*table.Table, error) - // DropTable tells the catalog to drop the table entirely - DropTable(ctx context.Context, identifier table.Identifier) error + // DropTable tells the catalog to drop the table entirely. + // If the purge flag is set, it is requested to purge the underlying table's data and metadata. + DropTable(ctx context.Context, identifier table.Identifier, purge bool) error // RenameTable tells the catalog to rename a given table by the identifiers // provided, and then loads and returns the destination table RenameTable(ctx context.Context, from, to table.Identifier) (*table.Table, error) diff --git a/catalog/glue.go b/catalog/glue.go index b398185..2336b35 100644 --- a/catalog/glue.go +++ b/catalog/glue.go @@ -125,7 +125,7 @@ func (c *GlueCatalog) CatalogType() CatalogType { return Glue } -func (c *GlueCatalog) DropTable(ctx context.Context, identifier table.Identifier) error { +func (c *GlueCatalog) DropTable(ctx context.Context, identifier table.Identifier, purge bool) error { return fmt.Errorf("%w: [Glue Catalog] drop table", iceberg.ErrNotImplemented) } diff --git a/catalog/rest.go b/catalog/rest.go index ef9c332..87d50a7 100644 --- a/catalog/rest.go +++ b/catalog/rest.go @@ -84,6 +84,86 @@ func (e errorResponse) Error() string { return e.Type + ": " + e.Message } +type identifier struct { + Namespace []string `json:"namespace"` + Name string `json:"name"` +} + +type commitTableResponse struct { + MetadataLoc string `json:"metadata-location"` + RawMetadata json.RawMessage `json:"metadata"` + Metadata table.Metadata `json:"-"` +} + +func (t *commitTableResponse) UnmarshalJSON(b []byte) (err error) { + type Alias commitTableResponse + if err = json.Unmarshal(b, (*Alias)(t)); err != nil { + return err + } + + t.Metadata, err = table.ParseMetadataBytes(t.RawMetadata) + return +} + +type loadTableResponse struct { + MetadataLoc string `json:"metadata-location"` + RawMetadata json.RawMessage `json:"metadata"` + Config iceberg.Properties `json:"config"` + Metadata table.Metadata `json:"-"` +} + +func (t *loadTableResponse) UnmarshalJSON(b []byte) (err error) { + type Alias loadTableResponse + if err = json.Unmarshal(b, (*Alias)(t)); err != nil { + return err + } + + t.Metadata, err = table.ParseMetadataBytes(t.RawMetadata) + return +} + +type createTableOption func(*createTableRequest) + +func WithLocation(loc string) createTableOption { + return func(req *createTableRequest) { + req.Location = strings.TrimRight(loc, "/") + } +} + +func WithPartitionSpec(spec *iceberg.PartitionSpec) createTableOption { + return func(req *createTableRequest) { + req.PartitionSpec = spec + } +} + +func WithWriteOrder(order *table.SortOrder) createTableOption { + return func(req *createTableRequest) { + req.WriteOrder = order + } +} + +func WithStageCreate() createTableOption { + return func(req *createTableRequest) { + req.StageCreate = true + } +} + +func WithProperties(props iceberg.Properties) createTableOption { + return func(req *createTableRequest) { + req.Props = props + } +} + +type createTableRequest struct { + Name string `json:"name"` + Schema *iceberg.Schema `json:"schema"` + Location string `json:"location,omitempty"` + PartitionSpec *iceberg.PartitionSpec `json:"partition-spec,omitempty"` + WriteOrder *table.SortOrder `json:"write-order,omitempty"` + StageCreate bool `json:"stage-create,omitempty"` + Props iceberg.Properties `json:"properties,omitempty"` +} + type oauthTokenResponse struct { AccessToken string `json:"access_token"` TokenType string `json:"token_type"` @@ -537,6 +617,20 @@ func checkValidNamespace(ident table.Identifier) error { return nil } +func (r *RestCatalog) tableFromResponse(identifier []string, metadata table.Metadata, loc string, config iceberg.Properties) (*table.Table, error) { + id := identifier + if r.name != "" { + id = append([]string{r.name}, identifier...) + } + + iofs, err := iceio.LoadFS(config, loc) + if err != nil { + return nil, err + } + + return table.New(id, metadata, loc, iofs), nil +} + func (r *RestCatalog) ListTables(ctx context.Context, namespace table.Identifier) ([]table.Identifier, error) { if err := checkValidNamespace(namespace); err != nil { return nil, err @@ -546,12 +640,8 @@ func (r *RestCatalog) ListTables(ctx context.Context, namespace table.Identifier path := []string{"namespaces", ns, "tables"} type resp struct { - Identifiers []struct { - Namespace []string `json:"namespace"` - Name string `json:"name"` - } `json:"identifiers"` + Identifiers []identifier `json:"identifiers"` } - rsp, err := doGet[resp](ctx, r.baseURI, path, r.cl, map[int]error{http.StatusNotFound: ErrNoSuchNamespace}) if err != nil { return nil, err @@ -573,64 +663,146 @@ func splitIdentForPath(ident table.Identifier) (string, string, error) { return strings.Join(NamespaceFromIdent(ident), namespaceSeparator), TableNameFromIdent(ident), nil } -type tblResponse struct { - MetadataLoc string `json:"metadata-location"` - RawMetadata json.RawMessage `json:"metadata"` - Config iceberg.Properties `json:"config"` - Metadata table.Metadata `json:"-"` -} +func (r *RestCatalog) CreateTable(ctx context.Context, identifier table.Identifier, schema *iceberg.Schema, opts ...createTableOption) (*table.Table, error) { + ns, tbl, err := splitIdentForPath(identifier) + if err != nil { + return nil, err + } -func (t *tblResponse) UnmarshalJSON(b []byte) (err error) { - type Alias tblResponse - if err = json.Unmarshal(b, (*Alias)(t)); err != nil { - return err + payload := createTableRequest{ + Name: tbl, + Schema: schema, + } + for _, o := range opts { + o(&payload) } - t.Metadata, err = table.ParseMetadataBytes(t.RawMetadata) - return + ret, err := doPost[createTableRequest, loadTableResponse](ctx, r.baseURI, []string{"namespaces", ns, "tables"}, payload, + r.cl, map[int]error{http.StatusNotFound: ErrNoSuchNamespace, http.StatusConflict: ErrTableAlreadyExists}) + if err != nil { + return nil, err + } + + config := maps.Clone(r.props) + maps.Copy(config, ret.Metadata.Properties()) + maps.Copy(config, ret.Config) + + return r.tableFromResponse(identifier, ret.Metadata, ret.MetadataLoc, config) } -func (r *RestCatalog) LoadTable(ctx context.Context, identifier table.Identifier, props iceberg.Properties) (*table.Table, error) { +func (r *RestCatalog) RegisterTable(ctx context.Context, identifier table.Identifier, metadataLoc string) (*table.Table, error) { ns, tbl, err := splitIdentForPath(identifier) if err != nil { return nil, err } - if props == nil { - props = iceberg.Properties{} + type payload struct { + Name string `json:"name"` + MetadataLoc string `json:"metadata-location"` } - ret, err := doGet[tblResponse](ctx, r.baseURI, []string{"namespaces", ns, "tables", tbl}, - r.cl, map[int]error{http.StatusNotFound: ErrNoSuchTable}) + ret, err := doPost[payload, loadTableResponse](ctx, r.baseURI, []string{"namespaces", ns, "tables", tbl}, + payload{Name: tbl, MetadataLoc: metadataLoc}, r.cl, map[int]error{http.StatusNotFound: ErrNoSuchNamespace, http.StatusConflict: ErrTableAlreadyExists}) if err != nil { return nil, err } - id := identifier - if r.name != "" { - id = append([]string{r.name}, identifier...) + config := maps.Clone(r.props) + maps.Copy(config, ret.Metadata.Properties()) + maps.Copy(config, ret.Config) + return r.tableFromResponse(identifier, ret.Metadata, ret.MetadataLoc, config) +} + +func (r *RestCatalog) LoadTable(ctx context.Context, identifier table.Identifier, props iceberg.Properties) (*table.Table, error) { + ns, tbl, err := splitIdentForPath(identifier) + if err != nil { + return nil, err + } + + ret, err := doGet[loadTableResponse](ctx, r.baseURI, []string{"namespaces", ns, "tables", tbl}, + r.cl, map[int]error{http.StatusNotFound: ErrNoSuchTable}) + if err != nil { + return nil, err } - tblProps := maps.Clone(r.props) - maps.Copy(tblProps, props) - maps.Copy(tblProps, ret.Metadata.Properties()) + config := maps.Clone(r.props) + maps.Copy(config, props) + maps.Copy(config, ret.Metadata.Properties()) for k, v := range ret.Config { - tblProps[k] = v + config[k] = v } - iofs, err := iceio.LoadFS(tblProps, ret.MetadataLoc) + return r.tableFromResponse(identifier, ret.Metadata, ret.MetadataLoc, config) +} + +func (r *RestCatalog) UpdateTable(ctx context.Context, ident table.Identifier, requirements []table.Requirement, updates []table.Update) (*table.Table, error) { + ns, tbl, err := splitIdentForPath(ident) if err != nil { return nil, err } - return table.New(id, ret.Metadata, ret.MetadataLoc, iofs), nil + + restIdentifier := identifier{ + Namespace: NamespaceFromIdent(ident), + Name: tbl, + } + type payload struct { + Identifier identifier `json:"identifier"` + Requirements []table.Requirement `json:"requirements"` + Updates []table.Update `json:"updates"` + } + ret, err := doPost[payload, commitTableResponse](ctx, r.baseURI, []string{"namespaces", ns, "tables", tbl}, + payload{Identifier: restIdentifier, Requirements: requirements, Updates: updates}, r.cl, + map[int]error{http.StatusNotFound: ErrNoSuchTable, http.StatusConflict: ErrCommitFailed}) + if err != nil { + return nil, err + } + + config := maps.Clone(r.props) + maps.Copy(config, ret.Metadata.Properties()) + + return r.tableFromResponse(ident, ret.Metadata, ret.MetadataLoc, config) } -func (r *RestCatalog) DropTable(ctx context.Context, identifier table.Identifier) error { - return fmt.Errorf("%w: [Rest Catalog] drop table", iceberg.ErrNotImplemented) +func (r *RestCatalog) DropTable(ctx context.Context, identifier table.Identifier, purge bool) error { + ns, tbl, err := splitIdentForPath(identifier) + if err != nil { + return err + } + + uri := r.baseURI.JoinPath("namespaces", ns, "tables", tbl) + if purge { + v := url.Values{} + v.Set("purgeRequested", "true") + uri.RawQuery = v.Encode() + } + + _, err = doDelete[struct{}](ctx, uri, []string{}, r.cl, + map[int]error{http.StatusNotFound: ErrNoSuchTable}) + + return err } func (r *RestCatalog) RenameTable(ctx context.Context, from, to table.Identifier) (*table.Table, error) { - return nil, fmt.Errorf("%w: [Rest Catalog] rename table", iceberg.ErrNotImplemented) + type payload struct { + From identifier `json:"from"` + To identifier `json:"to"` + } + f := identifier{ + Namespace: NamespaceFromIdent(from), + Name: TableNameFromIdent(from), + } + t := identifier{ + Namespace: NamespaceFromIdent(to), + Name: TableNameFromIdent(to), + } + + _, err := doPost[payload, any](ctx, r.baseURI, []string{"tables", "rename"}, payload{From: f, To: t}, r.cl, + map[int]error{http.StatusNotFound: ErrNoSuchTable}) + if err != nil { + return nil, err + } + + return r.LoadTable(ctx, to, nil) } func (r *RestCatalog) CreateNamespace(ctx context.Context, namespace table.Identifier, props iceberg.Properties) error { @@ -710,3 +882,20 @@ func (r *RestCatalog) UpdateNamespaceProperties(ctx context.Context, namespace t return doPost[payload, PropertiesUpdateSummary](ctx, r.baseURI, []string{"namespaces", ns, "properties"}, payload{Remove: removals, Updates: updates}, r.cl, map[int]error{http.StatusNotFound: ErrNoSuchNamespace}) } + +func (r *RestCatalog) CheckNamespaceExists(ctx context.Context, namespace table.Identifier) (bool, error) { + if err := checkValidNamespace(namespace); err != nil { + return false, err + } + + _, err := doGet[struct{}](ctx, r.baseURI, []string{"namespaces", strings.Join(namespace, namespaceSeparator)}, + r.cl, map[int]error{http.StatusNotFound: ErrNoSuchNamespace}) + if err != nil { + if errors.Is(err, ErrNoSuchNamespace) { + return false, nil + } + return false, err + } + + return true, nil +} diff --git a/catalog/rest_test.go b/catalog/rest_test.go index 618c5e0..212d5a8 100644 --- a/catalog/rest_test.go +++ b/catalog/rest_test.go @@ -22,6 +22,7 @@ import ( "crypto/tls" "crypto/x509" "encoding/json" + "fmt" "net/http" "net/http/httptest" "net/url" @@ -624,6 +625,105 @@ func (r *RestCatalogSuite) TestUpdateNamespaceProps404() { r.ErrorContains(err, "Namespace does not exist: does_not_exist in warehouse") } +var ( + exampleTableMetadataNoSnapshotV1 = `{ + "format-version": 1, + "table-uuid": "bf289591-dcc0-4234-ad4f-5c3eed811a29", + "location": "s3://warehouse/database/table", + "last-updated-ms": 1657810967051, + "last-column-id": 3, + "schema": { + "type": "struct", + "schema-id": 0, + "identifier-field-ids": [2], + "fields": [ + {"id": 1, "name": "foo", "required": false, "type": "string"}, + {"id": 2, "name": "bar", "required": true, "type": "int"}, + {"id": 3, "name": "baz", "required": false, "type": "boolean"} + ] + }, + "current-schema-id": 0, + "schemas": [ + { + "type": "struct", + "schema-id": 0, + "identifier-field-ids": [2], + "fields": [ + {"id": 1, "name": "foo", "required": false, "type": "string"}, + {"id": 2, "name": "bar", "required": true, "type": "int"}, + {"id": 3, "name": "baz", "required": false, "type": "boolean"} + ] + } + ], + "partition-spec": [], + "default-spec-id": 0, + "last-partition-id": 999, + "default-sort-order-id": 0, + "sort-orders": [{"order-id": 0, "fields": []}], + "properties": { + "write.delete.parquet.compression-codec": "zstd", + "write.metadata.compression-codec": "gzip", + "write.summary.partition-limit": "100", + "write.parquet.compression-codec": "zstd" + }, + "current-snapshot-id": -1, + "refs": {}, + "snapshots": [], + "snapshot-log": [], + "metadata-log": [] +}` + + createTableRestExample = fmt.Sprintf(`{ + "metadata-location": "s3://warehouse/database/table/metadata.json", + "metadata": %s, + "config": { + "client.factory": "io.tabular.iceberg.catalog.TabularAwsClientFactory", + "region": "us-west-2" + } +}`, exampleTableMetadataNoSnapshotV1) + + tableSchemaSimple = iceberg.NewSchemaWithIdentifiers(1, []int{2}, + iceberg.NestedField{ID: 1, Name: "foo", Type: iceberg.StringType{}, Required: false}, + iceberg.NestedField{ID: 2, Name: "bar", Type: iceberg.PrimitiveTypes.Int32, Required: true}, + iceberg.NestedField{ID: 3, Name: "baz", Type: iceberg.PrimitiveTypes.Bool, Required: false}, + ) +) + +func (r *RestCatalogSuite) TestCreateTable200() { + r.mux.HandleFunc("/v1/namespaces/fokko/tables", func(w http.ResponseWriter, req *http.Request) { + r.Require().Equal(http.MethodPost, req.Method) + + for k, v := range TestHeaders { + r.Equal(v, req.Header.Values(k)) + } + + w.Write([]byte(createTableRestExample)) + }) + + t := createTableRestExample + _ = t + cat, err := catalog.NewRestCatalog("rest", r.srv.URL, catalog.WithOAuthToken(TestToken)) + r.Require().NoError(err) + + tbl, err := cat.CreateTable( + context.Background(), + catalog.ToRestIdentifier("fokko", "fokko2"), + tableSchemaSimple, + ) + r.Require().NoError(err) + + r.Equal(catalog.ToRestIdentifier("rest", "fokko", "fokko2"), tbl.Identifier()) + r.Equal("s3://warehouse/database/table/metadata.json", tbl.MetadataLocation()) + r.EqualValues(1, tbl.Metadata().Version()) + r.Equal("bf289591-dcc0-4234-ad4f-5c3eed811a29", tbl.Metadata().TableUUID().String()) + r.EqualValues(1657810967051, tbl.Metadata().LastUpdatedMillis()) + r.Equal(3, tbl.Metadata().LastColumnID()) + r.Zero(tbl.Schema().ID) + r.Zero(tbl.Metadata().DefaultPartitionSpec()) + r.Equal(999, *tbl.Metadata().LastPartitionSpecID()) + r.Equal(table.UnsortedSortOrder, tbl.SortOrder()) +} + func (r *RestCatalogSuite) TestLoadTable200() { r.mux.HandleFunc("/v1/namespaces/fokko/tables/table", func(w http.ResponseWriter, req *http.Request) { r.Require().Equal(http.MethodGet, req.Method) diff --git a/cmd/iceberg/main.go b/cmd/iceberg/main.go index fb25618..6d3f4af 100644 --- a/cmd/iceberg/main.go +++ b/cmd/iceberg/main.go @@ -185,7 +185,7 @@ func main() { os.Exit(1) } case cfg.Table: - err := cat.DropTable(context.Background(), catalog.ToRestIdentifier(cfg.Ident)) + err := cat.DropTable(context.Background(), catalog.ToRestIdentifier(cfg.Ident), false) if err != nil { output.Error(err) os.Exit(1) diff --git a/go.mod b/go.mod index aeccfa1..3704529 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ module github.com/apache/iceberg-go -go 1.21 +go 1.23 require ( github.com/apache/arrow/go/v16 v16.1.0 @@ -54,30 +54,34 @@ require ( github.com/aws/aws-sdk-go-v2/service/sso v1.22.5 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.5 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.30.5 // indirect - github.com/containerd/console v1.0.3 // indirect + github.com/containerd/console v1.0.4 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/goccy/go-json v0.10.2 // indirect + github.com/goccy/go-json v0.10.3 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/flatbuffers v24.3.25+incompatible // indirect github.com/gookit/color v1.5.4 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.17.9 // indirect + github.com/klauspost/cpuid/v2 v2.2.8 // indirect + github.com/kr/pretty v0.3.1 // indirect github.com/lithammer/fuzzysearch v1.1.8 // indirect github.com/mattn/go-runewidth v0.0.15 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/rivo/uniseg v0.4.4 // indirect + github.com/rivo/uniseg v0.4.7 // indirect + github.com/rogpeppe/go-internal v1.12.0 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect - golang.org/x/mod v0.19.0 // indirect - golang.org/x/net v0.27.0 // indirect - golang.org/x/sync v0.7.0 // indirect - golang.org/x/sys v0.22.0 // indirect - golang.org/x/term v0.22.0 // indirect - golang.org/x/text v0.16.0 // indirect - golang.org/x/tools v0.23.0 // indirect + golang.org/x/mod v0.20.0 // indirect + golang.org/x/net v0.28.0 // indirect + golang.org/x/sync v0.8.0 // indirect + golang.org/x/sys v0.24.0 // indirect + golang.org/x/term v0.23.0 // indirect + golang.org/x/text v0.17.0 // indirect + golang.org/x/tools v0.24.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect + gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index d0c23a9..82d745d 100644 --- a/go.sum +++ b/go.sum @@ -56,15 +56,17 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.30.5 h1:OMsEmCyz2i89XwRwPouAJvhj81wI github.com/aws/aws-sdk-go-v2/service/sts v1.30.5/go.mod h1:vmSqFK+BVIwVpDAGZB3CoCXHzurt4qBE8lf+I/kRTh0= github.com/aws/smithy-go v1.20.4 h1:2HK1zBdPgRbjFOHlfeQZfpC4r72MOb9bZkiFwggKO+4= github.com/aws/smithy-go v1.20.4/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= -github.com/containerd/console v1.0.3 h1:lIr7SlA5PxZyMV30bDW0MGbiOPXwc63yRuCP0ARubLw= github.com/containerd/console v1.0.3/go.mod h1:7LqA/THxQ86k76b8c/EMSiaJ3h1eZkMkXar0TQ1gf3U= +github.com/containerd/console v1.0.4 h1:F2g4+oChYvBTsASRTz8NP6iIAi97J3TtSAsLbIFn4ro= +github.com/containerd/console v1.0.4/go.mod h1:YynlIjWYF8myEu6sdkwKIvGQq+cOckRm6So2avqoYAk= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815 h1:bWDMxwH3px2JBh6AyO7hdCn/PkvCZXii8TGj7sbtEbQ= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= -github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= -github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= +github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/flatbuffers v24.3.25+incompatible h1:CX395cjN9Kke9mmalRoL3d81AtFUxJM+yDthflgJGkI= @@ -85,10 +87,12 @@ github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ib github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.0.10/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c= github.com/klauspost/cpuid/v2 v2.0.12/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c= -github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= -github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM= +github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -105,6 +109,7 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pterm/pterm v0.12.27/go.mod h1:PhQ89w4i95rhgE+xedAoqous6K9X+r6aSOI2eFF7DZI= @@ -117,8 +122,11 @@ github.com/pterm/pterm v0.12.40/go.mod h1:ffwPLwlbXxP+rxT0GsgDTzS3y3rmpAO1NMjUkG github.com/pterm/pterm v0.12.79 h1:lH3yrYMhdpeqX9y5Ep1u7DejyHy7NSQg9qrBjF9dFT4= github.com/pterm/pterm v0.12.79/go.mod h1:1v/gzOF1N0FsjbgTHZ1wVycRkKiatFvJSJC4IGaQAAo= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= -github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= -github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= +github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ= github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -146,19 +154,19 @@ golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 h1:LfspQV/FYTatPTr/3HzIcmiUF golang.org/x/exp v0.0.0-20240222234643-814bf88cf225/go.mod h1:CxmFvTBINI24O/j8iY7H1xHzx2i4OsyguNBmN/uPtqc= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/mod v0.19.0 h1:fEdghXQSo20giMthA7cd28ZC+jts4amQ3YMXiP5oMQ8= -golang.org/x/mod v0.19.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/mod v0.20.0 h1:utOm6MM3R3dnawAiJgn0y+xvuYRsm1RKM/4giyfDgV0= +golang.org/x/mod v0.20.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= -golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= -golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -168,35 +176,37 @@ golang.org/x/sys v0.0.0-20211013075003-97ac67df715c/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= -golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= -golang.org/x/term v0.22.0 h1:BbsgPEJULsl2fV/AT3v15Mjva5yXKQDyKf+TbDz7QJk= -golang.org/x/term v0.22.0/go.mod h1:F3qCibpT5AMpCRfhfT53vVJwhLtIVHhB9XDjfFvnMI4= +golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU= +golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= -golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= -golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/tools v0.23.0 h1:SGsXPZ+2l4JsgaCKkx+FQ9YZ5XEtA1GZYuoDjenLjvg= -golang.org/x/tools v0.23.0/go.mod h1:pnu6ufv6vQkll6szChhK3C3L/ruaIv5eBeztNG8wtsI= +golang.org/x/tools v0.24.0 h1:J1shsA93PJUEVaUSaay7UXAyE8aimq3GW0pjlolpa24= +golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU= golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/partitions.go b/partitions.go index 321af2e..9416d70 100644 --- a/partitions.go +++ b/partitions.go @@ -20,9 +20,9 @@ package iceberg import ( "encoding/json" "fmt" + "iter" + "slices" "strings" - - "golang.org/x/exp/slices" ) const ( @@ -117,6 +117,11 @@ func (ps PartitionSpec) Equals(other PartitionSpec) bool { return ps.id == other.id && slices.Equal(ps.fields, other.fields) } +// Fields returns a clone of the partition fields in this spec. +func (ps *PartitionSpec) Fields() iter.Seq[PartitionField] { + return slices.Values(ps.fields) +} + func (ps PartitionSpec) MarshalJSON() ([]byte, error) { if ps.fields == nil { ps.fields = []PartitionField{} diff --git a/table/metadata.go b/table/metadata.go index 47b3ffe..b506575 100644 --- a/table/metadata.go +++ b/table/metadata.go @@ -22,14 +22,26 @@ import ( "errors" "fmt" "io" + "iter" "maps" "slices" + "time" "github.com/apache/iceberg-go" "github.com/google/uuid" ) +const ( + partitionFieldStartID = 1000 + supportedTableFormatVersion = 2 + + addPartionSpecAction = "add-partition-spec" + addSchemaAction = "add-schema" + addSnapshotAction = "add-snapshot" + addSortOrderAction = "add-sort-order" +) + // Metadata for an iceberg table as specified in the Iceberg spec // // https://iceberg.apache.org/spec/#iceberg-table-spec @@ -80,20 +92,544 @@ type Metadata interface { SnapshotByName(name string) *Snapshot // CurrentSnapshot returns the table's current snapshot. CurrentSnapshot() *Snapshot + // Ref returns the snapshot ref for the main branch. + Ref() SnapshotRef + // Refs returns a map of snapshot refs by name. + Refs() iter.Seq2[string, SnapshotRef] + // SnapshotLogs returns the list of snapshot logs for the table. + SnapshotLogs() iter.Seq[SnapshotLogEntry] // SortOrder returns the table's current sort order, ie: the one with the // ID that matches the default-sort-order-id. SortOrder() SortOrder // SortOrders returns the list of sort orders in the table. SortOrders() []SortOrder + // DefaultSortOrder returns the ID of the current sort order that writers + // should use by default. + DefaultSortOrder() int // Properties is a string to string map of table properties. This is used // to control settings that affect reading and writing and is not intended // to be used for arbitrary metadata. For example, commit.retry.num-retries // is used to control the number of commit retries. Properties() iceberg.Properties + // PreviousFiles returns the list of metadata log entries for the table. + PreviousFiles() iter.Seq[MetadataLogEntry] Equals(Metadata) bool } +type MetadataBuilder struct { + base Metadata + updates []Update + + // common fields + formatVersion int + uuid uuid.UUID + loc string + lastUpdatedMS int64 + lastColumnId int + schemaList []*iceberg.Schema + currentSchemaID int + specs []iceberg.PartitionSpec + defaultSpecID int + lastPartitionID *int + props iceberg.Properties + snapshotList []Snapshot + currentSnapshotID *int64 + snapshotLog []SnapshotLogEntry + metadataLog []MetadataLogEntry + sortOrderList []SortOrder + defaultSortOrderID int + refs map[string]SnapshotRef + + // V2 specific + lastSequenceNumber *int64 +} + +func NewMetadataBuilder() (*MetadataBuilder, error) { + return &MetadataBuilder{ + updates: make([]Update, 0), + schemaList: make([]*iceberg.Schema, 0), + specs: make([]iceberg.PartitionSpec, 0), + props: make(iceberg.Properties), + snapshotList: make([]Snapshot, 0), + snapshotLog: make([]SnapshotLogEntry, 0), + metadataLog: make([]MetadataLogEntry, 0), + sortOrderList: make([]SortOrder, 0), + refs: make(map[string]SnapshotRef), + }, nil +} + +func MetadataBuilderFromBase(metadata Metadata) (*MetadataBuilder, error) { + b := &MetadataBuilder{} + b.base = metadata + + b.formatVersion = metadata.Version() + b.uuid = metadata.TableUUID() + b.loc = metadata.Location() + b.lastUpdatedMS = metadata.LastUpdatedMillis() + b.lastColumnId = metadata.LastColumnID() + b.schemaList = metadata.Schemas() + b.currentSchemaID = metadata.CurrentSchema().ID + b.specs = metadata.PartitionSpecs() + b.defaultSpecID = metadata.DefaultPartitionSpec() + b.lastPartitionID = metadata.LastPartitionSpecID() + b.props = metadata.Properties() + b.snapshotList = metadata.Snapshots() + b.currentSnapshotID = &metadata.CurrentSnapshot().SnapshotID + b.sortOrderList = metadata.SortOrders() + b.defaultSortOrderID = metadata.DefaultSortOrder() + + b.refs = make(map[string]SnapshotRef) + for name, ref := range metadata.Refs() { + b.refs[name] = ref + } + + b.snapshotLog = make([]SnapshotLogEntry, 0) + for log := range metadata.SnapshotLogs() { + b.snapshotLog = append(b.snapshotLog, log) + } + + b.metadataLog = make([]MetadataLogEntry, 0) + for entry := range metadata.PreviousFiles() { + b.metadataLog = append(b.metadataLog, entry) + } + + return b, nil +} + +func (b *MetadataBuilder) AddSchema(schema *iceberg.Schema, newLastColumnID int, initial bool) (*MetadataBuilder, error) { + if newLastColumnID < b.lastColumnId { + return nil, fmt.Errorf("%w: newLastColumnID %d, must be >= %d", iceberg.ErrInvalidArgument, newLastColumnID, b.lastColumnId) + } + + var schemas []*iceberg.Schema + if initial { + schemas = []*iceberg.Schema{schema} + } else { + schemas = append(b.schemaList, schema) + } + + b.lastColumnId = newLastColumnID + b.schemaList = schemas + b.updates = append(b.updates, NewAddSchemaUpdate(schema, newLastColumnID, initial)) + + return b, nil +} + +func (b *MetadataBuilder) AddPartitionSpec(spec *iceberg.PartitionSpec, initial bool) (*MetadataBuilder, error) { + for _, s := range b.specs { + if s.ID() == spec.ID() && !initial { + return nil, fmt.Errorf("partition spec with id %d already exists", spec.ID()) + } + } + + maxFieldID := 0 + for f := range spec.Fields() { + maxFieldID = max(maxFieldID, f.FieldID) + } + + prev := partitionFieldStartID - 1 + if b.lastPartitionID != nil { + prev = *b.lastPartitionID + } + lastPartitionID := max(maxFieldID, prev) + + var specs []iceberg.PartitionSpec + if initial { + specs = []iceberg.PartitionSpec{*spec} + } else { + specs = append(b.specs, *spec) + } + + b.specs = specs + b.lastPartitionID = &lastPartitionID + b.updates = append(b.updates, NewAddPartitionSpecUpdate(spec, initial)) + + return b, nil +} + +func (b *MetadataBuilder) AddSnapshot(snapshot *Snapshot) (*MetadataBuilder, error) { + if snapshot == nil { + return nil, nil + } + + if len(b.schemaList) == 0 { + return nil, errors.New("can't add snapshot with no added schemas") + } else if len(b.specs) == 0 { + return nil, errors.New("can't add snapshot with no added partition specs") + } else if s, _ := b.SnapshotByID(snapshot.SnapshotID); s != nil { + return nil, fmt.Errorf("can't add snapshot with id %d, already exists", snapshot.SnapshotID) + } else if b.formatVersion == 2 && + snapshot.SequenceNumber > 0 && + snapshot.SequenceNumber <= *b.lastSequenceNumber && + snapshot.ParentSnapshotID != nil { + return nil, fmt.Errorf("can't add snapshot with sequence number %d, must be > than last sequence number %d", + snapshot.SequenceNumber, b.lastSequenceNumber) + } + + b.updates = append(b.updates, NewAddSnapshotUpdate(snapshot)) + b.lastUpdatedMS = snapshot.TimestampMs + b.lastSequenceNumber = &snapshot.SequenceNumber + b.snapshotList = append(b.snapshotList, *snapshot) + return b, nil +} + +func (b *MetadataBuilder) AddSortOrder(sortOrder *SortOrder, initial bool) (*MetadataBuilder, error) { + for _, s := range b.sortOrderList { + if s.OrderID == sortOrder.OrderID && !initial { + return nil, fmt.Errorf("sort order with id %d already exists", sortOrder.OrderID) + } + } + + var sortOrders []SortOrder + if initial { + sortOrders = []SortOrder{*sortOrder} + } else { + sortOrders = append(b.sortOrderList, *sortOrder) + } + + b.sortOrderList = sortOrders + b.updates = append(b.updates, NewAddSortOrderUpdate(sortOrder, initial)) + + return b, nil +} + +func (b *MetadataBuilder) RemoveProperties(keys []string) (*MetadataBuilder, error) { + if len(keys) == 0 { + return b, nil + } + + b.updates = append(b.updates, NewRemovePropertiesUpdate(keys)) + for _, key := range keys { + delete(b.props, key) + } + + return b, nil +} + +func (b *MetadataBuilder) SetCurrentSchemaID(currentSchemaID int) (*MetadataBuilder, error) { + if currentSchemaID == -1 { + currentSchemaID = maxBy(b.schemaList, func(s *iceberg.Schema) int { + return s.ID + }) + if !slices.ContainsFunc(b.updates, func(u Update) bool { + return u.Action() == addSchemaAction && u.(*addSchemaUpdate).Schema.ID == currentSchemaID + }) { + return nil, errors.New("can't set current schema to last added schema, no schema has been added") + } + } + + if currentSchemaID == b.currentSchemaID { + return b, nil + } + + _, err := b.GetSchemaByID(currentSchemaID) + if err != nil { + return nil, fmt.Errorf("can't set current schema to schema with id %d: %w", currentSchemaID, err) + } + + b.updates = append(b.updates, NewSetCurrentSchemaUpdate(currentSchemaID)) + b.currentSchemaID = currentSchemaID + return b, nil +} + +func (b *MetadataBuilder) SetDefaultSortOrderID(defaultSortOrderID int) (*MetadataBuilder, error) { + if defaultSortOrderID == -1 { + defaultSortOrderID = maxBy(b.sortOrderList, func(s SortOrder) int { + return s.OrderID + }) + if !slices.ContainsFunc(b.updates, func(u Update) bool { + return u.Action() == addSortOrderAction && u.(*addSortOrderUpdate).SortOrder.OrderID == defaultSortOrderID + }) { + return nil, fmt.Errorf("can't set default sort order to last added with no added sort orders") + } + } + + if defaultSortOrderID == b.defaultSortOrderID { + return b, nil + } + + if _, err := b.GetSortOrderByID(defaultSortOrderID); err != nil { + return nil, fmt.Errorf("can't set default sort order to sort order with id %d: %w", defaultSortOrderID, err) + } + + b.updates = append(b.updates, NewSetDefaultSortOrderUpdate(defaultSortOrderID)) + b.defaultSortOrderID = defaultSortOrderID + return b, nil +} + +func (b *MetadataBuilder) SetDefaultSpecID(defaultSpecID int) (*MetadataBuilder, error) { + if defaultSpecID == -1 { + defaultSpecID = maxBy(b.specs, func(s iceberg.PartitionSpec) int { + return s.ID() + }) + if !slices.ContainsFunc(b.updates, func(u Update) bool { + return u.Action() == addPartionSpecAction && u.(*addPartitionSpecUpdate).Spec.ID() == defaultSpecID + }) { + return nil, fmt.Errorf("can't set default spec to last added with no added partition specs") + } + } + + if defaultSpecID == b.defaultSpecID { + return b, nil + } + + if _, err := b.GetSpecByID(defaultSpecID); err != nil { + return nil, fmt.Errorf("can't set default spec to spec with id %d: %w", defaultSpecID, err) + } + + b.updates = append(b.updates, NewSetDefaultSpecUpdate(defaultSpecID)) + b.defaultSpecID = defaultSpecID + return b, nil +} + +func (b *MetadataBuilder) SetFormatVersion(formatVersion int) (*MetadataBuilder, error) { + if formatVersion < b.formatVersion { + return nil, fmt.Errorf("downgrading format version from %d to %d is not allowed", + b.formatVersion, formatVersion) + } + + if formatVersion > supportedTableFormatVersion { + return nil, fmt.Errorf("unsupported format version %d", formatVersion) + } + + if formatVersion == b.formatVersion { + return b, nil + } + + b.updates = append(b.updates, NewUpgradeFormatVersionUpdate(formatVersion)) + b.formatVersion = formatVersion + return b, nil +} + +func (b *MetadataBuilder) SetLoc(loc string) (*MetadataBuilder, error) { + if b.loc == loc { + return b, nil + } + + b.updates = append(b.updates, NewSetLocationUpdate(loc)) + b.loc = loc + return b, nil +} + +func (b *MetadataBuilder) SetProperties(props iceberg.Properties) (*MetadataBuilder, error) { + if len(props) == 0 { + return b, nil + } + + b.updates = append(b.updates, NewSetPropertiesUpdate(props)) + maps.Copy(b.props, props) + return b, nil +} + +type setSnapshotRefOption func(*SnapshotRef) error + +func WithMaxRefAgeMs(maxRefAgeMs int64) setSnapshotRefOption { + return func(ref *SnapshotRef) error { + if maxRefAgeMs <= 0 { + return fmt.Errorf("%w: maxRefAgeMs %d, must be > 0", iceberg.ErrInvalidArgument, maxRefAgeMs) + } + ref.MaxRefAgeMs = &maxRefAgeMs + return nil + } +} + +func WithMaxSnapshotAgeMs(maxSnapshotAgeMs int64) setSnapshotRefOption { + return func(ref *SnapshotRef) error { + if maxSnapshotAgeMs <= 0 { + return fmt.Errorf("%w: maxSnapshotAgeMs %d, must be > 0", iceberg.ErrInvalidArgument, maxSnapshotAgeMs) + } + ref.MaxSnapshotAgeMs = &maxSnapshotAgeMs + return nil + } +} + +func WithMinSnapshotsToKeep(minSnapshotsToKeep int) setSnapshotRefOption { + return func(ref *SnapshotRef) error { + if minSnapshotsToKeep <= 0 { + return fmt.Errorf("%w: minSnapshotsToKeep %d, must be > 0", iceberg.ErrInvalidArgument, minSnapshotsToKeep) + } + ref.MinSnapshotsToKeep = &minSnapshotsToKeep + return nil + } +} + +func (b *MetadataBuilder) SetSnapshotRef( + name string, + snapshotID int64, + refType RefType, + options ...setSnapshotRefOption, +) (*MetadataBuilder, error) { + ref := SnapshotRef{ + SnapshotID: snapshotID, + SnapshotRefType: refType, + } + for _, opt := range options { + if err := opt(&ref); err != nil { + return nil, fmt.Errorf("invalid snapshot ref option: %w", err) + } + } + + var maxRefAgeMs, maxSnapshotAgeMs int64 + var minSnapshotsToKeep int + if ref.MaxRefAgeMs != nil { + maxRefAgeMs = *ref.MaxRefAgeMs + } + if ref.MaxSnapshotAgeMs != nil { + maxSnapshotAgeMs = *ref.MaxSnapshotAgeMs + } + if ref.MinSnapshotsToKeep != nil { + minSnapshotsToKeep = *ref.MinSnapshotsToKeep + } + + if existingRef, ok := b.refs[name]; ok && existingRef.Equals(ref) { + return b, nil + } + + snapshot, err := b.SnapshotByID(snapshotID) + if err != nil { + return nil, fmt.Errorf("can't set snapshot ref %s to unknown snapshot %d: %w", name, snapshotID, err) + } + + if refType == MainBranch { + b.updates = append(b.updates, NewSetSnapshotRefUpdate(name, snapshotID, refType, maxRefAgeMs, maxSnapshotAgeMs, minSnapshotsToKeep)) + b.currentSnapshotID = &snapshotID + b.snapshotLog = append(b.snapshotLog, SnapshotLogEntry{ + SnapshotID: snapshotID, + TimestampMs: snapshot.TimestampMs, + }) + b.lastUpdatedMS = time.Now().Local().UnixMilli() + } + + if slices.ContainsFunc(b.updates, func(u Update) bool { + return u.Action() == addSnapshotAction && u.(*addSnapshotUpdate).Snapshot.SnapshotID == snapshotID + }) { + b.lastUpdatedMS = snapshot.TimestampMs + } + + b.refs[name] = ref + return b, nil +} + +func (b *MetadataBuilder) SetUUID(uuid uuid.UUID) (*MetadataBuilder, error) { + if b.uuid == uuid { + return b, nil + } + + b.updates = append(b.updates, NewAssignUUIDUpdate(uuid)) + b.uuid = uuid + return b, nil +} + +func (b *MetadataBuilder) buildCommonMetadata() *commonMetadata { + return &commonMetadata{ + FormatVersion: b.formatVersion, + UUID: b.uuid, + Loc: b.loc, + LastUpdatedMS: b.lastUpdatedMS, + LastColumnId: b.lastColumnId, + SchemaList: b.schemaList, + CurrentSchemaID: b.currentSchemaID, + Specs: b.specs, + DefaultSpecID: b.defaultSpecID, + LastPartitionID: b.lastPartitionID, + Props: b.props, + SnapshotList: b.snapshotList, + CurrentSnapshotID: b.currentSnapshotID, + SnapshotLog: b.snapshotLog, + MetadataLog: b.metadataLog, + SortOrderList: b.sortOrderList, + DefaultSortOrderID: b.defaultSortOrderID, + SnapshotRefs: b.refs, + } +} + +func (b *MetadataBuilder) GetSchemaByID(id int) (*iceberg.Schema, error) { + for _, s := range b.schemaList { + if s.ID == id { + return s, nil + } + } + + return nil, fmt.Errorf("%w: schema with id %d not found", iceberg.ErrInvalidArgument, id) +} + +func (b *MetadataBuilder) GetSpecByID(id int) (*iceberg.PartitionSpec, error) { + for _, s := range b.specs { + if s.ID() == id { + return &s, nil + } + } + + return nil, fmt.Errorf("partition spec with id %d not found", id) +} + +func (b *MetadataBuilder) GetSortOrderByID(id int) (*SortOrder, error) { + for _, s := range b.sortOrderList { + if s.OrderID == id { + return &s, nil + } + } + + return nil, fmt.Errorf("sort order with id %d not found", id) +} + +func (b *MetadataBuilder) SnapshotByID(id int64) (*Snapshot, error) { + for _, s := range b.snapshotList { + if s.SnapshotID == id { + return &s, nil + } + } + + return nil, fmt.Errorf("snapshot with id %d not found", id) +} + +func (b *MetadataBuilder) Build() (Metadata, error) { + common := b.buildCommonMetadata() + switch b.formatVersion { + case 1: + schema, err := b.GetSchemaByID(b.currentSchemaID) + if err != nil { + return nil, fmt.Errorf("can't build metadata, missing schema for schema ID %d: %w", b.currentSchemaID, err) + } + + partition, err := b.GetSpecByID(b.defaultSpecID) + if err != nil { + return nil, fmt.Errorf("can't build metadata, missing partition spec for spec ID %d: %w", b.defaultSpecID, err) + } + + partitionFields := make([]iceberg.PartitionField, 0) + for f := range partition.Fields() { + partitionFields = append(partitionFields, f) + } + + return &metadataV1{ + Schema: schema, + Partition: partitionFields, + commonMetadata: *common, + }, nil + + case 2: + return &metadataV2{ + LastSequenceNumber: *b.lastSequenceNumber, + commonMetadata: *common, + }, nil + + default: + panic("unreachable: invalid format version") + } +} + +// maxBy returns the maximum value of extract(e) for all e in elems. +// If elems is empty, returns 0. +func maxBy[S ~[]E, E any](elems S, extract func(e E) int) int { + m := 0 + for _, e := range elems { + m = max(m, extract(e)) + } + return m +} + var ( ErrInvalidMetadataFormatVersion = errors.New("invalid or missing format-version in table metadata") ErrInvalidMetadata = errors.New("invalid metadata") @@ -128,9 +664,9 @@ func ParseMetadataBytes(b []byte) (Metadata, error) { var ret Metadata switch ver.FormatVersion { case 1: - ret = &MetadataV1{} + ret = &metadataV1{} case 2: - ret = &MetadataV2{} + ret = &metadataV2{} default: return nil, ErrInvalidMetadataFormatVersion } @@ -163,10 +699,28 @@ type commonMetadata struct { MetadataLog []MetadataLogEntry `json:"metadata-log"` SortOrderList []SortOrder `json:"sort-orders"` DefaultSortOrderID int `json:"default-sort-order-id"` - Refs map[string]SnapshotRef `json:"refs"` + SnapshotRefs map[string]SnapshotRef `json:"refs"` +} + +func (c *commonMetadata) Ref() SnapshotRef { return c.SnapshotRefs[MainBranch] } +func (c *commonMetadata) Refs() iter.Seq2[string, SnapshotRef] { return maps.All(c.SnapshotRefs) } +func (c *commonMetadata) SnapshotLogs() iter.Seq[SnapshotLogEntry] { + return slices.Values(c.SnapshotLog) +} + +func (c *commonMetadata) PreviousFiles() iter.Seq[MetadataLogEntry] { + return slices.Values(c.MetadataLog) } func (c *commonMetadata) Equals(other *commonMetadata) bool { + if other == nil { + return false + } + + if c == other { + return true + } + switch { case c.LastPartitionID == nil && other.LastPartitionID != nil: fallthrough @@ -187,7 +741,7 @@ func (c *commonMetadata) Equals(other *commonMetadata) bool { fallthrough case !maps.Equal(c.Props, other.Props): fallthrough - case !maps.EqualFunc(c.Refs, other.Refs, func(sr1, sr2 SnapshotRef) bool { return sr1.Equals(sr2) }): + case !maps.EqualFunc(c.SnapshotRefs, other.SnapshotRefs, func(sr1, sr2 SnapshotRef) bool { return sr1.Equals(sr2) }): return false } @@ -245,7 +799,7 @@ func (c *commonMetadata) SnapshotByID(id int64) *Snapshot { } func (c *commonMetadata) SnapshotByName(name string) *Snapshot { - if ref, ok := c.Refs[name]; ok { + if ref, ok := c.SnapshotRefs[name]; ok { return c.SnapshotByID(ref.SnapshotID) } return nil @@ -268,6 +822,10 @@ func (c *commonMetadata) SortOrder() SortOrder { return UnsortedSortOrder } +func (c *commonMetadata) DefaultSortOrder() int { + return c.DefaultSortOrderID +} + func (c *commonMetadata) Properties() iceberg.Properties { return c.Props } @@ -284,8 +842,8 @@ func (c *commonMetadata) preValidate() { } if c.CurrentSnapshotID != nil { - if _, ok := c.Refs[MainBranch]; !ok { - c.Refs[MainBranch] = SnapshotRef{ + if _, ok := c.SnapshotRefs[MainBranch]; !ok { + c.SnapshotRefs[MainBranch] = SnapshotRef{ SnapshotID: *c.CurrentSnapshotID, SnapshotRefType: BranchRef, } @@ -296,8 +854,8 @@ func (c *commonMetadata) preValidate() { c.MetadataLog = []MetadataLogEntry{} } - if c.Refs == nil { - c.Refs = make(map[string]SnapshotRef) + if c.SnapshotRefs == nil { + c.SnapshotRefs = make(map[string]SnapshotRef) } if c.SnapshotLog == nil { @@ -370,26 +928,34 @@ func (c *commonMetadata) validate() error { func (c *commonMetadata) Version() int { return c.FormatVersion } -type MetadataV1 struct { - Schema iceberg.Schema `json:"schema"` +type metadataV1 struct { + Schema *iceberg.Schema `json:"schema"` Partition []iceberg.PartitionField `json:"partition-spec"` commonMetadata } -func (m *MetadataV1) Equals(other Metadata) bool { - rhs, ok := other.(*MetadataV1) +func (m *metadataV1) Equals(other Metadata) bool { + rhs, ok := other.(*metadataV1) if !ok { return false } - return m.Schema.Equals(&rhs.Schema) && slices.Equal(m.Partition, rhs.Partition) && + if m == rhs { + return true + } + + if m == nil || rhs == nil { + return false + } + + return m.Schema.Equals(rhs.Schema) && slices.Equal(m.Partition, rhs.Partition) && m.commonMetadata.Equals(&rhs.commonMetadata) } -func (m *MetadataV1) preValidate() { - if len(m.SchemaList) == 0 { - m.SchemaList = []*iceberg.Schema{&m.Schema} +func (m *metadataV1) preValidate() { + if len(m.SchemaList) == 0 && m.Schema != nil { + m.SchemaList = []*iceberg.Schema{m.Schema} } if len(m.Specs) == 0 { @@ -416,8 +982,8 @@ func (m *MetadataV1) preValidate() { m.commonMetadata.preValidate() } -func (m *MetadataV1) UnmarshalJSON(b []byte) error { - type Alias MetadataV1 +func (m *metadataV1) UnmarshalJSON(b []byte) error { + type Alias metadataV1 aux := (*Alias)(m) if err := json.Unmarshal(b, aux); err != nil { @@ -428,34 +994,42 @@ func (m *MetadataV1) UnmarshalJSON(b []byte) error { return m.validate() } -func (m *MetadataV1) ToV2() MetadataV2 { +func (m *metadataV1) ToV2() metadataV2 { commonOut := m.commonMetadata commonOut.FormatVersion = 2 if commonOut.UUID.String() == "" { commonOut.UUID = uuid.New() } - return MetadataV2{commonMetadata: commonOut} + return metadataV2{commonMetadata: commonOut} } -type MetadataV2 struct { - LastSequenceNumber int `json:"last-sequence-number"` +type metadataV2 struct { + LastSequenceNumber int64 `json:"last-sequence-number"` commonMetadata } -func (m *MetadataV2) Equals(other Metadata) bool { - rhs, ok := other.(*MetadataV2) +func (m *metadataV2) Equals(other Metadata) bool { + rhs, ok := other.(*metadataV2) if !ok { return false } + if m == rhs { + return true + } + + if m == nil || rhs == nil { + return false + } + return m.LastSequenceNumber == rhs.LastSequenceNumber && m.commonMetadata.Equals(&rhs.commonMetadata) } -func (m *MetadataV2) UnmarshalJSON(b []byte) error { - type Alias MetadataV2 +func (m *metadataV2) UnmarshalJSON(b []byte) error { + type Alias metadataV2 aux := (*Alias)(m) if err := json.Unmarshal(b, aux); err != nil { diff --git a/table/metadata_test.go b/table/metadata_internal_test.go similarity index 93% rename from table/metadata_test.go rename to table/metadata_internal_test.go index e268d88..a02ac7f 100644 --- a/table/metadata_test.go +++ b/table/metadata_internal_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package table_test +package table import ( "encoding/json" @@ -23,7 +23,6 @@ import ( "testing" "github.com/apache/iceberg-go" - "github.com/apache/iceberg-go/table" "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -112,14 +111,14 @@ const ExampleTableMetadataV1 = `{ }` func TestMetadataV1Parsing(t *testing.T) { - meta, err := table.ParseMetadataBytes([]byte(ExampleTableMetadataV1)) + meta, err := ParseMetadataBytes([]byte(ExampleTableMetadataV1)) require.NoError(t, err) require.NotNil(t, meta) - assert.IsType(t, (*table.MetadataV1)(nil), meta) + assert.IsType(t, (*metadataV1)(nil), meta) assert.Equal(t, 1, meta.Version()) - data := meta.(*table.MetadataV1) + data := meta.(*metadataV1) assert.Equal(t, uuid.MustParse("d20125c8-7284-442c-9aea-15fee620737c"), meta.TableUUID()) assert.Equal(t, "s3://bucket/test/location", meta.Location()) assert.Equal(t, int64(1602638573874), meta.LastUpdatedMillis()) @@ -156,21 +155,21 @@ func TestMetadataV1Parsing(t *testing.T) { assert.Nil(t, meta.SnapshotByID(0)) assert.Nil(t, meta.SnapshotByName("foo")) assert.Zero(t, data.DefaultSortOrderID) - assert.Equal(t, table.UnsortedSortOrder, meta.SortOrder()) + assert.Equal(t, UnsortedSortOrder, meta.SortOrder()) } func TestMetadataV2Parsing(t *testing.T) { - meta, err := table.ParseMetadataBytes([]byte(ExampleTableMetadataV2)) + meta, err := ParseMetadataBytes([]byte(ExampleTableMetadataV2)) require.NoError(t, err) require.NotNil(t, meta) - assert.IsType(t, (*table.MetadataV2)(nil), meta) + assert.IsType(t, (*metadataV2)(nil), meta) assert.Equal(t, 2, meta.Version()) - data := meta.(*table.MetadataV2) + data := meta.(*metadataV2) assert.Equal(t, uuid.MustParse("9c12d441-03fe-4693-9a96-a0705ddf69c1"), data.UUID) assert.Equal(t, "s3://bucket/test/location", data.Location()) - assert.Equal(t, 34, data.LastSequenceNumber) + assert.Equal(t, int64(34), data.LastSequenceNumber) assert.Equal(t, int64(1602638573590), data.LastUpdatedMS) assert.Equal(t, 3, data.LastColumnId) assert.Equal(t, 0, data.SchemaList[0].ID) @@ -192,7 +191,7 @@ func TestMetadataV2Parsing(t *testing.T) { } func TestParsingCorrectTypes(t *testing.T) { - var meta table.MetadataV2 + var meta metadataV2 require.NoError(t, json.Unmarshal([]byte(ExampleTableMetadataV2), &meta)) assert.IsType(t, &iceberg.Schema{}, meta.SchemaList[0]) @@ -201,7 +200,7 @@ func TestParsingCorrectTypes(t *testing.T) { } func TestSerializeMetadataV1(t *testing.T) { - var meta table.MetadataV1 + var meta metadataV1 require.NoError(t, json.Unmarshal([]byte(ExampleTableMetadataV1), &meta)) data, err := json.Marshal(&meta) @@ -212,7 +211,7 @@ func TestSerializeMetadataV1(t *testing.T) { } func TestSerializeMetadataV2(t *testing.T) { - var meta table.MetadataV2 + var meta metadataV2 require.NoError(t, json.Unmarshal([]byte(ExampleTableMetadataV2), &meta)) data, err := json.Marshal(&meta) @@ -243,9 +242,9 @@ func TestInvalidFormatVersion(t *testing.T) { "snapshots": [] }` - _, err := table.ParseMetadataBytes([]byte(metadataInvalidFormat)) + _, err := ParseMetadataBytes([]byte(metadataInvalidFormat)) assert.Error(t, err) - assert.ErrorIs(t, err, table.ErrInvalidMetadataFormatVersion) + assert.ErrorIs(t, err, ErrInvalidMetadataFormatVersion) } func TestCurrentSchemaNotFound(t *testing.T) { @@ -278,9 +277,9 @@ func TestCurrentSchemaNotFound(t *testing.T) { "snapshots": [] }` - _, err := table.ParseMetadataBytes([]byte(schemaNotFound)) + _, err := ParseMetadataBytes([]byte(schemaNotFound)) assert.Error(t, err) - assert.ErrorIs(t, err, table.ErrInvalidMetadata) + assert.ErrorIs(t, err, ErrInvalidMetadata) assert.ErrorContains(t, err, "current-schema-id 2 can't be found in any schema") } @@ -322,9 +321,9 @@ func TestSortOrderNotFound(t *testing.T) { "snapshots": [] }` - _, err := table.ParseMetadataBytes([]byte(metadataSortOrderNotFound)) + _, err := ParseMetadataBytes([]byte(metadataSortOrderNotFound)) assert.Error(t, err) - assert.ErrorIs(t, err, table.ErrInvalidMetadata) + assert.ErrorIs(t, err, ErrInvalidMetadata) assert.ErrorContains(t, err, "default-sort-order-id 4 can't be found in [3: [\n2 asc nulls-first\nbucket[4](3) desc nulls-last\n]]") } @@ -358,10 +357,10 @@ func TestSortOrderUnsorted(t *testing.T) { "snapshots": [] }` - var meta table.MetadataV2 + var meta metadataV2 require.NoError(t, json.Unmarshal([]byte(sortOrderUnsorted), &meta)) - assert.Equal(t, table.UnsortedSortOrderID, meta.DefaultSortOrderID) + assert.Equal(t, UnsortedSortOrderID, meta.DefaultSortOrderID) assert.Len(t, meta.SortOrderList, 0) } @@ -394,28 +393,28 @@ func TestInvalidPartitionSpecID(t *testing.T) { "last-partition-id": 1000 }` - var meta table.MetadataV2 + var meta metadataV2 err := json.Unmarshal([]byte(invalidSpecID), &meta) - assert.ErrorIs(t, err, table.ErrInvalidMetadata) + assert.ErrorIs(t, err, ErrInvalidMetadata) assert.ErrorContains(t, err, "default-spec-id 1 can't be found") } func TestV2RefCreation(t *testing.T) { - var meta table.MetadataV2 + var meta metadataV2 require.NoError(t, json.Unmarshal([]byte(ExampleTableMetadataV2), &meta)) maxRefAge := int64(10000000) - assert.Equal(t, map[string]table.SnapshotRef{ + assert.Equal(t, map[string]SnapshotRef{ "main": { SnapshotID: 3055729675574597004, - SnapshotRefType: table.BranchRef, + SnapshotRefType: BranchRef, }, "test": { SnapshotID: 3051729675574597004, - SnapshotRefType: table.TagRef, + SnapshotRefType: TagRef, MaxRefAgeMs: &maxRefAge, }, - }, meta.Refs) + }, meta.SnapshotRefs) } func TestV1WriteMetadataToV2(t *testing.T) { @@ -453,11 +452,11 @@ func TestV1WriteMetadataToV2(t *testing.T) { "snapshots": [{"snapshot-id": 1925, "timestamp-ms": 1602638573822}] }` - meta, err := table.ParseMetadataString(minimalV1Example) + meta, err := ParseMetadataString(minimalV1Example) require.NoError(t, err) - assert.IsType(t, (*table.MetadataV1)(nil), meta) + assert.IsType(t, (*metadataV1)(nil), meta) - metaV2 := meta.(*table.MetadataV1).ToV2() + metaV2 := meta.(*metadataV1).ToV2() metaV2Json, err := json.Marshal(metaV2) require.NoError(t, err) diff --git a/table/requirements.go b/table/requirements.go new file mode 100644 index 0000000..0f3110d --- /dev/null +++ b/table/requirements.go @@ -0,0 +1,256 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "fmt" + + "github.com/google/uuid" +) + +// A Requirement is a validation rule that must be satisfied before attempting to +// make and commit changes to a table. Requirements are used to ensure that the +// table is in a valid state before making changes. +type Requirement interface { + // Validate checks that the current table metadata satisfies the requirement. + Validate(Metadata) error +} + +// baseRequirement is a common struct that all requirements embed. It is used to +// identify the type of the requirement. +type baseRequirement struct { + Type string `json:"type"` +} + +type assertCreate struct { + baseRequirement +} + +// AssertCreate creates a requirement that the table does not already exist. +func AssertCreate() Requirement { + return &assertCreate{ + baseRequirement: baseRequirement{Type: "assert-create"}, + } +} + +func (a *assertCreate) Validate(meta Metadata) error { + if meta != nil { + return fmt.Errorf("Table already exists") + } + + return nil +} + +type assertTableUuid struct { + baseRequirement + UUID uuid.UUID `json:"uuid"` +} + +// AssertTableUUID creates a requirement that the table UUID matches the given UUID. +func AssertTableUUID(uuid uuid.UUID) Requirement { + return &assertTableUuid{ + baseRequirement: baseRequirement{Type: "assert-table-uuid"}, + UUID: uuid, + } +} + +func (a *assertTableUuid) Validate(meta Metadata) error { + if meta == nil { + return fmt.Errorf("requirement failed: current table metadata does not exist") + } + + if meta.TableUUID() != a.UUID { + return fmt.Errorf("UUID mismatch: %s != %s", meta.TableUUID(), a.UUID) + } + + return nil +} + +type assertRefSnapshotID struct { + baseRequirement + Ref string `json:"ref"` + SnapshotID *int64 `json:"snapshot-id"` +} + +// AssertRefSnapshotID creates a requirement which ensures that the table branch +// or tag identified by the given ref must reference the given snapshot id. +// If the id is nil, the ref must not already exist. +func AssertRefSnapshotID(ref string, id *int64) Requirement { + return &assertRefSnapshotID{ + baseRequirement: baseRequirement{Type: "assert-ref-snapshot-id"}, + Ref: ref, + SnapshotID: id, + } +} + +func (a *assertRefSnapshotID) Validate(meta Metadata) error { + if meta == nil { + return fmt.Errorf("requirement failed: current table metadata does not exist") + } + + var r *SnapshotRef + for name, ref := range meta.Refs() { + if name == a.Ref { + r = &ref + break + } + } + if r == nil { + return fmt.Errorf("requirement failed: branch or tag %s is missing, expected %d", a.Ref, a.SnapshotID) + } + + if a.SnapshotID == nil { + return fmt.Errorf("requirement failed: %s %s was created concurrently", r.SnapshotRefType, a.Ref) + } + + if r.SnapshotID != *a.SnapshotID { + return fmt.Errorf("requirement failed: %s %s has changed: expected id %d, found %d", r.SnapshotRefType, a.Ref, a.SnapshotID, r.SnapshotID) + } + + return nil +} + +type assertLastAssignedFieldId struct { + baseRequirement + LastAssignedFieldID int `json:"last-assigned-field-id"` +} + +// AssertLastAssignedFieldID validates that the table's last assigned column ID +// matches the given id. +func AssertLastAssignedFieldID(id int) Requirement { + return &assertLastAssignedFieldId{ + baseRequirement: baseRequirement{Type: "assert-last-assigned-field-id"}, + LastAssignedFieldID: id, + } +} + +func (a *assertLastAssignedFieldId) Validate(meta Metadata) error { + if meta == nil { + return fmt.Errorf("requirement failed: current table metadata does not exist") + } + + if meta.LastColumnID() != a.LastAssignedFieldID { + return fmt.Errorf("requirement failed: last assigned field id has changed: expected %d, found %d", a.LastAssignedFieldID, meta.LastColumnID()) + } + + return nil +} + +type assertCurrentSchemaId struct { + baseRequirement + CurrentSchemaID int `json:"current-schema-id"` +} + +// AssertCurrentSchemaId creates a requirement that the table's current schema ID +// matches the given id. +func AssertCurrentSchemaID(id int) Requirement { + return &assertCurrentSchemaId{ + baseRequirement: baseRequirement{Type: "assert-current-schema-id"}, + CurrentSchemaID: id, + } +} + +func (a *assertCurrentSchemaId) Validate(meta Metadata) error { + if meta == nil { + return fmt.Errorf("requirement failed: current table metadata does not exist") + } + + if meta.CurrentSchema().ID != a.CurrentSchemaID { + return fmt.Errorf("requirement failed: current schema id has changed: expected %d, found %d", a.CurrentSchemaID, meta.CurrentSchema().ID) + } + + return nil +} + +type assertLastAssignedPartitionId struct { + baseRequirement + LastAssignedPartitionID int `json:"last-assigned-partition-id"` +} + +// AssertLastAssignedPartitionID creates a requriement that the table's last assigned partition ID +// matches the given id. +func AssertLastAssignedPartitionID(id int) Requirement { + return &assertLastAssignedPartitionId{ + baseRequirement: baseRequirement{Type: "assert-last-assigned-partition-id"}, + LastAssignedPartitionID: id, + } +} + +func (a *assertLastAssignedPartitionId) Validate(meta Metadata) error { + if meta == nil { + return fmt.Errorf("requirement failed: current table metadata does not exist") + } + + if *meta.LastPartitionSpecID() != a.LastAssignedPartitionID { + return fmt.Errorf("requirement failed: last assigned partition id has changed: expected %d, found %d", a.LastAssignedPartitionID, *meta.LastPartitionSpecID()) + } + + return nil +} + +type assertDefaultSpecId struct { + baseRequirement + DefaultSpecID int `json:"default-spec-id"` +} + +// AssertDefaultSpecID creates a requirement that the table's default partition spec ID +// matches the given id. +func AssertDefaultSpecID(id int) Requirement { + return &assertDefaultSpecId{ + baseRequirement: baseRequirement{Type: "assert-default-spec-id"}, + DefaultSpecID: id, + } +} + +func (a *assertDefaultSpecId) Validate(meta Metadata) error { + if meta == nil { + return fmt.Errorf("requirement failed: current table metadata does not exist") + } + + if meta.DefaultPartitionSpec() != a.DefaultSpecID { + return fmt.Errorf("requirement failed: default spec id has changed: expected %d, found %d", a.DefaultSpecID, meta.DefaultPartitionSpec()) + } + + return nil +} + +type assertDefaultSortOrderId struct { + baseRequirement + DefaultSortOrderID int `json:"default-sort-order-id"` +} + +// AssertDefaultSortOrderID creates a requirement that the table's default sort order ID +// matches the given id. +func AssertDefaultSortOrderID(id int) Requirement { + return &assertDefaultSortOrderId{ + baseRequirement: baseRequirement{Type: "assert-default-sort-order-id"}, + DefaultSortOrderID: id, + } +} + +func (a *assertDefaultSortOrderId) Validate(meta Metadata) error { + if meta == nil { + return fmt.Errorf("requirement failed: current table metadata does not exist") + } + + if meta.DefaultSortOrder() != a.DefaultSortOrderID { + return fmt.Errorf("requirement failed: default sort order id has changed: expected %d, found %d", a.DefaultSortOrderID, meta.DefaultSortOrder()) + } + + return nil +} diff --git a/table/table_test.go b/table/table_test.go index cde94ab..f09054c 100644 --- a/table/table_test.go +++ b/table/table_test.go @@ -41,7 +41,7 @@ func (t *TableTestSuite) SetupSuite() { var mockfs internal.MockFS mockfs.Test(t.T()) mockfs.On("Open", "s3://bucket/test/location/uuid.metadata.json"). - Return(&internal.MockFile{Contents: bytes.NewReader([]byte(ExampleTableMetadataV2))}, nil) + Return(&internal.MockFile{Contents: bytes.NewReader([]byte(table.ExampleTableMetadataV2))}, nil) defer mockfs.AssertExpectations(t.T()) tbl, err := table.NewFromLocation([]string{"foo"}, "s3://bucket/test/location/uuid.metadata.json", &mockfs) @@ -59,7 +59,7 @@ func (t *TableTestSuite) TestNewTableFromReadFile() { var mockfsReadFile internal.MockFSReadFile mockfsReadFile.Test(t.T()) mockfsReadFile.On("ReadFile", "s3://bucket/test/location/uuid.metadata.json"). - Return([]byte(ExampleTableMetadataV2), nil) + Return([]byte(table.ExampleTableMetadataV2), nil) defer mockfsReadFile.AssertExpectations(t.T()) tbl2, err := table.NewFromLocation([]string{"foo"}, "s3://bucket/test/location/uuid.metadata.json", &mockfsReadFile) diff --git a/table/updates.go b/table/updates.go new file mode 100644 index 0000000..9cfaf49 --- /dev/null +++ b/table/updates.go @@ -0,0 +1,371 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "fmt" + + "github.com/apache/iceberg-go" + "github.com/google/uuid" +) + +// Update represents a change to a table's metadata. +type Update interface { + // Action returns the name of the action that the update represents. + Action() string + // Apply applies the update to the given metadata builder. + Apply(*MetadataBuilder) error +} + +// baseUpdate contains the common fields for all updates. It is used to identify the type +// of the update. +type baseUpdate struct { + ActionName string `json:"action"` +} + +func (u *baseUpdate) Action() string { + return u.ActionName +} + +type assignUUIDUpdate struct { + baseUpdate + UUID uuid.UUID `json:"uuid"` +} + +// NewAssignUUIDUpdate creates a new update to assign a UUID to the table metadata. +func NewAssignUUIDUpdate(uuid uuid.UUID) Update { + return &assignUUIDUpdate{ + baseUpdate: baseUpdate{ActionName: "assign-uuid"}, + UUID: uuid, + } +} + +// Apply updates the UUID on the given metadata builder. +func (u *assignUUIDUpdate) Apply(builder *MetadataBuilder) error { + _, err := builder.SetUUID(u.UUID) + return err +} + +type upgradeFormatVersionUpdate struct { + baseUpdate + FormatVersion int `json:"format-version"` +} + +// NewUpgradeFormatVersionUpdate creates a new update that upgrades the format version +// of the table metadata to the given formatVersion. +func NewUpgradeFormatVersionUpdate(formatVersion int) Update { + return &upgradeFormatVersionUpdate{ + baseUpdate: baseUpdate{ActionName: "upgrade-format-version"}, + FormatVersion: formatVersion, + } +} + +// Apply upgrades the format version on the given metadata builder. +func (u *upgradeFormatVersionUpdate) Apply(builder *MetadataBuilder) error { + _, err := builder.SetFormatVersion(u.FormatVersion) + return err +} + +// addSchemaUpdate adds a schema to the table metadata. +type addSchemaUpdate struct { + baseUpdate + Schema *iceberg.Schema `json:"schema"` + LastColumnID int `json:"last-column-id"` + initial bool +} + +// NewAddSchemaUpdate creates a new update that adds the given schema and last column ID to +// the table metadata. If the initial flag is set to true, the schema is considered the initial +// schema of the table, and all previously added schemas in the metadata builder are removed. +func NewAddSchemaUpdate(schema *iceberg.Schema, lastColumnID int, initial bool) Update { + return &addSchemaUpdate{ + baseUpdate: baseUpdate{ActionName: "add-schema"}, + Schema: schema, + LastColumnID: lastColumnID, + initial: initial, + } +} + +func (u *addSchemaUpdate) Apply(builder *MetadataBuilder) error { + _, err := builder.AddSchema(u.Schema, u.LastColumnID, u.initial) + return err +} + +type setCurrentSchemaUpdate struct { + baseUpdate + SchemaID int `json:"schema-id"` +} + +// NewSetCurrentSchemaUpdate creates a new update that sets the current schema of the table +// metadata to the given schema ID. +func NewSetCurrentSchemaUpdate(id int) Update { + return &setCurrentSchemaUpdate{ + baseUpdate: baseUpdate{ActionName: "set-current-schema"}, + SchemaID: id, + } +} + +func (u *setCurrentSchemaUpdate) Apply(builder *MetadataBuilder) error { + _, err := builder.SetCurrentSchemaID(u.SchemaID) + return err +} + +type addPartitionSpecUpdate struct { + baseUpdate + Spec *iceberg.PartitionSpec `json:"spec"` + initial bool +} + +// NewAddPartitionSpecUpdate creates a new update that adds the given partition spec to the table +// metadata. If the initial flag is set to true, the spec is considered the initial spec of the table, +// and all other previously added specs in the metadata builder are removed. +func NewAddPartitionSpecUpdate(spec *iceberg.PartitionSpec, initial bool) Update { + return &addPartitionSpecUpdate{ + baseUpdate: baseUpdate{ActionName: "add-spec"}, + Spec: spec, + initial: initial, + } +} + +func (u *addPartitionSpecUpdate) Apply(builder *MetadataBuilder) error { + _, err := builder.AddPartitionSpec(u.Spec, u.initial) + return err +} + +type setDefaultSpecUpdate struct { + baseUpdate + SpecID int `json:"spec-id"` +} + +// NewSetDefaultSpecUpdate creates a new update that sets the default partition spec of the +// table metadata to the given spec ID. +func NewSetDefaultSpecUpdate(id int) Update { + return &setDefaultSpecUpdate{ + baseUpdate: baseUpdate{ActionName: "set-default-spec"}, + SpecID: id, + } +} + +func (u *setDefaultSpecUpdate) Apply(builder *MetadataBuilder) error { + _, err := builder.SetDefaultSpecID(u.SpecID) + return err +} + +type addSortOrderUpdate struct { + baseUpdate + SortOrder *SortOrder `json:"sort-order"` + initial bool +} + +// NewAddSortOrderUpdate creates a new update that adds the given sort order to the table metadata. +// If the initial flag is set to true, the sort order is considered the initial sort order of the table, +// and all previously added sort orders in the metadata builder are removed. +func NewAddSortOrderUpdate(sortOrder *SortOrder, initial bool) Update { + return &addSortOrderUpdate{ + baseUpdate: baseUpdate{ActionName: "add-sort-order"}, + SortOrder: sortOrder, + initial: initial, + } +} + +func (u *addSortOrderUpdate) Apply(builder *MetadataBuilder) error { + _, err := builder.AddSortOrder(u.SortOrder, u.initial) + return err +} + +type setDefaultSortOrderUpdate struct { + baseUpdate + SortOrderID int `json:"sort-order-id"` +} + +// NewSetDefaultSortOrderUpdate creates a new update that sets the default sort order of the table metadata +// to the given sort order ID. +func NewSetDefaultSortOrderUpdate(id int) Update { + return &setDefaultSortOrderUpdate{ + baseUpdate: baseUpdate{ActionName: "set-default-sort-order"}, + SortOrderID: id, + } +} + +func (u *setDefaultSortOrderUpdate) Apply(builder *MetadataBuilder) error { + _, err := builder.SetDefaultSortOrderID(u.SortOrderID) + return err +} + +type addSnapshotUpdate struct { + baseUpdate + Snapshot *Snapshot `json:"snapshot"` +} + +// NewAddSnapshotUpdate creates a new update that adds the given snapshot to the table metadata. +func NewAddSnapshotUpdate(snapshot *Snapshot) Update { + return &addSnapshotUpdate{ + baseUpdate: baseUpdate{ActionName: "add-snapshot"}, + Snapshot: snapshot, + } +} + +func (u *addSnapshotUpdate) Apply(builder *MetadataBuilder) error { + _, err := builder.AddSnapshot(u.Snapshot) + return err +} + +type setSnapshotRefUpdate struct { + baseUpdate + RefName string `json:"ref-name"` + RefType RefType `json:"type"` + SnapshotID int64 `json:"snapshot-id"` + MaxRefAgeMs int64 `json:"max-ref-age-ms,omitempty"` + MaxSnapshotAgeMs int64 `json:"max-snapshot-age-ms,omitempty"` + MinSnapshotsToKeep int `json:"min-snapshots-to-keep,omitempty"` +} + +// NewSetSnapshotRefUpdate creates a new update that sets the given snapshot reference +// as the current snapshot of the table metadata. MaxRefAgeMs, MaxSnapshotAgeMs, +// and MinSnapshotsToKeep are optional, and any non-positive values are ignored. +func NewSetSnapshotRefUpdate( + name string, + snapshotID int64, + refType RefType, + maxRefAgeMs, maxSnapshotAgeMs int64, + minSnapshotsToKeep int, +) Update { + return &setSnapshotRefUpdate{ + baseUpdate: baseUpdate{ActionName: "set-snapshot-ref"}, + RefName: name, + RefType: refType, + SnapshotID: snapshotID, + MaxRefAgeMs: maxRefAgeMs, + MaxSnapshotAgeMs: maxSnapshotAgeMs, + MinSnapshotsToKeep: minSnapshotsToKeep, + } +} + +func (u *setSnapshotRefUpdate) Apply(builder *MetadataBuilder) error { + opts := []setSnapshotRefOption{} + if u.MaxRefAgeMs >= 0 { + opts = append(opts, WithMaxRefAgeMs(u.MaxRefAgeMs)) + } + if u.MaxSnapshotAgeMs >= 0 { + opts = append(opts, WithMaxSnapshotAgeMs(u.MaxSnapshotAgeMs)) + } + if u.MinSnapshotsToKeep >= 0 { + opts = append(opts, WithMinSnapshotsToKeep(u.MinSnapshotsToKeep)) + } + + _, err := builder.SetSnapshotRef( + u.RefName, + u.SnapshotID, + u.RefType, + opts..., + ) + return err +} + +type setLocationUpdate struct { + baseUpdate + Location string `json:"location"` +} + +// NewSetLocationUpdate creates a new update that sets the location of the table metadata. +func NewSetLocationUpdate(loc string) Update { + return &setLocationUpdate{ + baseUpdate: baseUpdate{ActionName: "set-location"}, + Location: loc, + } +} + +func (u *setLocationUpdate) Apply(builder *MetadataBuilder) error { + _, err := builder.SetLoc(u.Location) + return err +} + +type setPropertiesUpdate struct { + baseUpdate + Updates iceberg.Properties `json:"updates"` +} + +// NewSetPropertiesUpdate creates a new update that sets the given properties in the +// table metadata. +func NewSetPropertiesUpdate(updates iceberg.Properties) *setPropertiesUpdate { + return &setPropertiesUpdate{ + baseUpdate: baseUpdate{ActionName: "set-properties"}, + Updates: updates, + } +} + +func (u *setPropertiesUpdate) Apply(builder *MetadataBuilder) error { + _, err := builder.SetProperties(u.Updates) + return err +} + +type removePropertiesUpdate struct { + baseUpdate + Removals []string `json:"removals"` +} + +// NewRemovePropertiesUpdate creates a new update that removes properties from the table metadata. +// The properties are identified by their names, and if a property with the given name does not exist, +// it is ignored. +func NewRemovePropertiesUpdate(removals []string) Update { + return &removePropertiesUpdate{ + baseUpdate: baseUpdate{ActionName: "remove-properties"}, + Removals: removals, + } +} + +func (u *removePropertiesUpdate) Apply(builder *MetadataBuilder) error { + _, err := builder.RemoveProperties(u.Removals) + return err +} + +type removeSnapshotsUpdate struct { + baseUpdate + SnapshotIDs []int64 `json:"snapshot-ids"` +} + +// NewRemoveSnapshotsUpdate creates a new update that removes all snapshots from +// the table metadata with the given snapshot IDs. +func NewRemoveSnapshotsUpdate(ids []int64) Update { + return &removeSnapshotsUpdate{ + baseUpdate: baseUpdate{ActionName: "remove-snapshots"}, + SnapshotIDs: ids, + } +} + +func (u *removeSnapshotsUpdate) Apply(builder *MetadataBuilder) error { + return fmt.Errorf("%w: remove-snapshots", iceberg.ErrNotImplemented) +} + +type removeSnapshotRefUpdate struct { + baseUpdate + RefName string `json:"ref-name"` +} + +// NewRemoveSnapshotRefUpdate creates a new update that removes a snapshot reference +// from the table metadata. +func NewRemoveSnapshotRefUpdate(ref string) *removeSnapshotRefUpdate { + return &removeSnapshotRefUpdate{ + baseUpdate: baseUpdate{ActionName: "remove-snapshot-ref"}, + RefName: ref, + } +} + +func (u *removeSnapshotRefUpdate) Apply(builder *MetadataBuilder) error { + return fmt.Errorf("%w: remove-snapshot-ref", iceberg.ErrNotImplemented) +}