Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Impl rest catalog + table updates & requirements #146

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
6 changes: 4 additions & 2 deletions catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var (
ErrNoSuchTable = errors.New("table does not exist")
ErrNoSuchNamespace = errors.New("namespace does not exist")
ErrNamespaceAlreadyExists = errors.New("namespace already exists")
ErrTableAlreadyExists = errors.New("table already exists")
)

// WithAwsConfig sets the AWS configuration for the catalog.
Expand Down Expand Up @@ -147,8 +148,9 @@ type Catalog interface {
ListTables(ctx context.Context, namespace table.Identifier) ([]table.Identifier, error)
// LoadTable loads a table from the catalog and returns a Table with the metadata.
LoadTable(ctx context.Context, identifier table.Identifier, props iceberg.Properties) (*table.Table, error)
// DropTable tells the catalog to drop the table entirely
DropTable(ctx context.Context, identifier table.Identifier) error
// DropTable tells the catalog to drop the table entirely.
// If the purge flag is set, it is requested to purge the underlying table's data and metadata.
DropTable(ctx context.Context, identifier table.Identifier, purge bool) error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should add explanation of what the purge argument does. is that something that was recently added to the REST spec?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete:
tags:
- Catalog API
summary: Drop a table from the catalog
operationId: dropTable
description: Remove a table from the catalog
parameters:
- name: purgeRequested
in: query
required: false
description: Whether the user requested to purge the underlying table's data and metadata
schema:
type: boolean
default: false

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// RenameTable tells the catalog to rename a given table by the identifiers
// provided, and then loads and returns the destination table
RenameTable(ctx context.Context, from, to table.Identifier) (*table.Table, error)
Expand Down
2 changes: 1 addition & 1 deletion catalog/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (c *GlueCatalog) CatalogType() CatalogType {
return Glue
}

func (c *GlueCatalog) DropTable(ctx context.Context, identifier table.Identifier) error {
func (c *GlueCatalog) DropTable(ctx context.Context, identifier table.Identifier, purge bool) error {
return fmt.Errorf("%w: [Glue Catalog] drop table", iceberg.ErrNotImplemented)
}

Expand Down
262 changes: 228 additions & 34 deletions catalog/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,86 @@ func (e errorResponse) Error() string {
return e.Type + ": " + e.Message
}

type Identifier struct {
Namespace []string `json:"namespace"`
Name string `json:"name"`
}
Comment on lines +87 to +90
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we're going to export this type, we should probably name it RestIdentifier or something equivalent to separate it from other catalog identifier types.


type commitTableResponse struct {
MetadataLoc string `json:"metadata-location"`
RawMetadata json.RawMessage `json:"metadata"`
Metadata table.Metadata `json:"-"`
}

func (t *commitTableResponse) UnmarshalJSON(b []byte) (err error) {
type Alias commitTableResponse
if err = json.Unmarshal(b, (*Alias)(t)); err != nil {
return err
}

t.Metadata, err = table.ParseMetadataBytes(t.RawMetadata)
return
}

type loadTableResponse struct {
MetadataLoc string `json:"metadata-location"`
RawMetadata json.RawMessage `json:"metadata"`
Config iceberg.Properties `json:"config"`
Metadata table.Metadata `json:"-"`
}

func (t *loadTableResponse) UnmarshalJSON(b []byte) (err error) {
type Alias loadTableResponse
if err = json.Unmarshal(b, (*Alias)(t)); err != nil {
return err
}

t.Metadata, err = table.ParseMetadataBytes(t.RawMetadata)
return
}

type createTableOption func(*createTableRequest)

func WithLocation(loc string) createTableOption {
return func(req *createTableRequest) {
req.Location = strings.TrimRight(loc, "/")
}
}

func WithPartitionSpec(spec iceberg.PartitionSpec) createTableOption {
return func(req *createTableRequest) {
req.PartitionSpec = spec
}
}

func WithWriteOrder(order table.SortOrder) createTableOption {
return func(req *createTableRequest) {
req.WriteOrder = order
}
}

func WithStageCreate() createTableOption {
return func(req *createTableRequest) {
req.StageCreate = true
}
}

func WithProperties(props iceberg.Properties) createTableOption {
return func(req *createTableRequest) {
req.Props = props
}
}

type createTableRequest struct {
Name string `json:"name"`
Location string `json:"location"`
Schema *iceberg.Schema `json:"schema"`
PartitionSpec iceberg.PartitionSpec `json:"partition-spec"`
WriteOrder table.SortOrder `json:"write-order"`
StageCreate bool `json:"stage-create"`
Props iceberg.Properties `json:"properties"`
}

type oauthTokenResponse struct {
AccessToken string `json:"access_token"`
TokenType string `json:"token_type"`
Expand Down Expand Up @@ -537,6 +617,20 @@ func checkValidNamespace(ident table.Identifier) error {
return nil
}

func (r *RestCatalog) tableFromResponse(identifier []string, metadata table.Metadata, loc string, config iceberg.Properties) (*table.Table, error) {
id := identifier
if r.name != "" {
id = append([]string{r.name}, identifier...)
}

iofs, err := iceio.LoadFS(config, loc)
if err != nil {
return nil, err
}

return table.New(id, metadata, loc, iofs), nil
}

func (r *RestCatalog) ListTables(ctx context.Context, namespace table.Identifier) ([]table.Identifier, error) {
if err := checkValidNamespace(namespace); err != nil {
return nil, err
Expand All @@ -546,12 +640,8 @@ func (r *RestCatalog) ListTables(ctx context.Context, namespace table.Identifier
path := []string{"namespaces", ns, "tables"}

type resp struct {
Identifiers []struct {
Namespace []string `json:"namespace"`
Name string `json:"name"`
} `json:"identifiers"`
Identifiers []Identifier `json:"identifiers"`
Comment on lines -549 to +643
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the identifier type used anywhere other than here? The reason I had done it inline here was because it was only used in this one spot and i didn't want it to get confused with table.Identifier

}

rsp, err := doGet[resp](ctx, r.baseURI, path, r.cl, map[int]error{http.StatusNotFound: ErrNoSuchNamespace})
if err != nil {
return nil, err
Expand All @@ -573,64 +663,151 @@ func splitIdentForPath(ident table.Identifier) (string, string, error) {
return strings.Join(NamespaceFromIdent(ident), namespaceSeparator), TableNameFromIdent(ident), nil
}

type tblResponse struct {
MetadataLoc string `json:"metadata-location"`
RawMetadata json.RawMessage `json:"metadata"`
Config iceberg.Properties `json:"config"`
Metadata table.Metadata `json:"-"`
}
func (r *RestCatalog) CreateTable(ctx context.Context, identifier table.Identifier, schema *iceberg.Schema, opts ...createTableOption) (*table.Table, error) {
ns, tbl, err := splitIdentForPath(identifier)
if err != nil {
return nil, err
}

func (t *tblResponse) UnmarshalJSON(b []byte) (err error) {
type Alias tblResponse
if err = json.Unmarshal(b, (*Alias)(t)); err != nil {
return err
payload := createTableRequest{
Name: tbl,
Schema: schema,
}
for _, o := range opts {
o(&payload)
}

t.Metadata, err = table.ParseMetadataBytes(t.RawMetadata)
return
ret, err := doPost[createTableRequest, loadTableResponse](ctx, r.baseURI, []string{"namespaces", ns, "tables"}, payload,
r.cl, map[int]error{http.StatusNotFound: ErrNoSuchNamespace, http.StatusConflict: ErrTableAlreadyExists})
if err != nil {
return nil, err
}

config := maps.Clone(r.props)
maps.Copy(config, ret.Metadata.Properties())
for k, v := range ret.Config {
config[k] = v
}
Comment on lines +688 to +690
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why loop instead of just doing maps.Copy (which does the loop internally)


return r.tableFromResponse(identifier, ret.Metadata, ret.MetadataLoc, config)
}

func (r *RestCatalog) LoadTable(ctx context.Context, identifier table.Identifier, props iceberg.Properties) (*table.Table, error) {
func (r *RestCatalog) RegisterTable(ctx context.Context, identifier table.Identifier, metadataLoc string) (*table.Table, error) {
ns, tbl, err := splitIdentForPath(identifier)
if err != nil {
return nil, err
}

if props == nil {
props = iceberg.Properties{}
type payload struct {
Name string `json:"name"`
MetadataLoc string `json:"metadata-location"`
}

ret, err := doGet[tblResponse](ctx, r.baseURI, []string{"namespaces", ns, "tables", tbl},
r.cl, map[int]error{http.StatusNotFound: ErrNoSuchTable})
ret, err := doPost[payload, loadTableResponse](ctx, r.baseURI, []string{"namespaces", ns, "tables", tbl},
payload{Name: tbl, MetadataLoc: metadataLoc}, r.cl, map[int]error{http.StatusNotFound: ErrNoSuchNamespace, http.StatusConflict: ErrTableAlreadyExists})
if err != nil {
return nil, err
}

id := identifier
if r.name != "" {
id = append([]string{r.name}, identifier...)
config := maps.Clone(r.props)
maps.Copy(config, ret.Metadata.Properties())
for k, v := range ret.Config {
config[k] = v
}
Comment on lines +714 to 716
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question, why not maps.Copy(config, ret.Config)?


tblProps := maps.Clone(r.props)
maps.Copy(tblProps, props)
maps.Copy(tblProps, ret.Metadata.Properties())
return r.tableFromResponse(identifier, ret.Metadata, ret.MetadataLoc, config)
}

func (r *RestCatalog) LoadTable(ctx context.Context, identifier table.Identifier, props iceberg.Properties) (*table.Table, error) {
ns, tbl, err := splitIdentForPath(identifier)
if err != nil {
return nil, err
}

ret, err := doGet[loadTableResponse](ctx, r.baseURI, []string{"namespaces", ns, "tables", tbl},
r.cl, map[int]error{http.StatusNotFound: ErrNoSuchTable})
if err != nil {
return nil, err
}

config := maps.Clone(r.props)
maps.Copy(config, props)
maps.Copy(config, ret.Metadata.Properties())
for k, v := range ret.Config {
tblProps[k] = v
config[k] = v
}

return r.tableFromResponse(identifier, ret.Metadata, ret.MetadataLoc, config)
}

func (r *RestCatalog) UpdateTable(ctx context.Context, identifier table.Identifier, requirements []table.Requirement, updates []table.Update) (*table.Table, error) {
ns, tbl, err := splitIdentForPath(identifier)
if err != nil {
return nil, err
}

iofs, err := iceio.LoadFS(tblProps, ret.MetadataLoc)
ident := Identifier{
Namespace: NamespaceFromIdent(identifier),
Name: tbl,
}
type payload struct {
Identifier Identifier `json:"identifier"`
Requirements []table.Requirement `json:"requirements"`
Updates []table.Update `json:"updates"`
}
ret, err := doPost[payload, commitTableResponse](ctx, r.baseURI, []string{"namespaces", ns, "tables", tbl},
payload{Identifier: ident, Requirements: requirements, Updates: updates}, r.cl,
map[int]error{http.StatusNotFound: ErrNoSuchTable, http.StatusConflict: ErrCommitFailed})
if err != nil {
return nil, err
}
return table.New(id, ret.Metadata, ret.MetadataLoc, iofs), nil

config := maps.Clone(r.props)
maps.Copy(config, ret.Metadata.Properties())

return r.tableFromResponse(identifier, ret.Metadata, ret.MetadataLoc, config)
}

func (r *RestCatalog) DropTable(ctx context.Context, identifier table.Identifier) error {
return fmt.Errorf("%w: [Rest Catalog] drop table", iceberg.ErrNotImplemented)
func (r *RestCatalog) DropTable(ctx context.Context, identifier table.Identifier, purge bool) error {
ns, tbl, err := splitIdentForPath(identifier)
if err != nil {
return err
}

uri := r.baseURI.JoinPath("namespaces", ns, "tables", tbl)
if purge {
v := url.Values{}
v.Set("purgeRequested", "true")
uri.RawQuery = v.Encode()
}

_, err = doDelete[struct{}](ctx, uri, []string{}, r.cl,
map[int]error{http.StatusNotFound: ErrNoSuchTable})

return err
}

func (r *RestCatalog) RenameTable(ctx context.Context, from, to table.Identifier) (*table.Table, error) {
return nil, fmt.Errorf("%w: [Rest Catalog] rename table", iceberg.ErrNotImplemented)
type payload struct {
From Identifier `json:"from"`
To Identifier `json:"to"`
}
f := Identifier{
Namespace: NamespaceFromIdent(from),
Name: TableNameFromIdent(from),
}
t := Identifier{
Namespace: NamespaceFromIdent(to),
Name: TableNameFromIdent(to),
}

_, err := doPost[payload, any](ctx, r.baseURI, []string{"tables", "rename"}, payload{From: f, To: t}, r.cl,
map[int]error{http.StatusNotFound: ErrNoSuchTable})
if err != nil {
return nil, err
}

return r.LoadTable(ctx, to, nil)
}

func (r *RestCatalog) CreateNamespace(ctx context.Context, namespace table.Identifier, props iceberg.Properties) error {
Expand Down Expand Up @@ -710,3 +887,20 @@ func (r *RestCatalog) UpdateNamespaceProperties(ctx context.Context, namespace t
return doPost[payload, PropertiesUpdateSummary](ctx, r.baseURI, []string{"namespaces", ns, "properties"},
payload{Remove: removals, Updates: updates}, r.cl, map[int]error{http.StatusNotFound: ErrNoSuchNamespace})
}

func (r *RestCatalog) CheckNamespaceExists(ctx context.Context, namespace table.Identifier) (bool, error) {
if err := checkValidNamespace(namespace); err != nil {
return false, err
}

_, err := doGet[struct{}](ctx, r.baseURI, []string{"namespaces", strings.Join(namespace, namespaceSeparator)},
r.cl, map[int]error{http.StatusNotFound: ErrNoSuchNamespace})
if err != nil {
if errors.Is(err, ErrNoSuchNamespace) {
return false, nil
}
return false, err
}

return true, nil
}
Loading
Loading