Skip to content

Commit

Permalink
Graveler transform catalog interface (#993)
Browse files Browse the repository at this point in the history
* Catalog as generic key/value with EntryCatalog as a layer between

* split interfaces

* wip

* graveler

* revert mvcc changes

* revert export cmd

* revert some more

* revert part 2

* use iterators at catalog level

* code review changes

* remove unused ListingType

* update comments

* update Log
  • Loading branch information
nopcoder authored Dec 3, 2020
1 parent a57e068 commit b1b3858
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 174 deletions.
93 changes: 47 additions & 46 deletions catalog/rocks/cataloger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"context"

"github.com/treeverse/lakefs/catalog"
"github.com/treeverse/lakefs/graveler"
"github.com/treeverse/lakefs/logging"
)

type cataloger struct {
Catalog Catalog
EntryCatalog EntryCatalog
log logging.Logger
dummyDedupCh chan *catalog.DedupReport
hooks catalog.CatalogerHooks
Expand All @@ -23,19 +24,19 @@ func NewCataloger() catalog.Cataloger {

// CreateRepository create a new repository pointing to 'storageNamespace' (ex: s3://bucket1/repo) with default branch name 'branch'
func (c *cataloger) CreateRepository(ctx context.Context, repository string, storageNamespace string, branch string) (*catalog.Repository, error) {
repositoryID, err := NewRepositoryID(repository)
repositoryID, err := graveler.NewRepositoryID(repository)
if err != nil {
return nil, err
}
storageNS, err := NewStorageNamespace(storageNamespace)
storageNS, err := graveler.NewStorageNamespace(storageNamespace)
if err != nil {
return nil, err
}
branchID, err := NewBranchID(branch)
branchID, err := graveler.NewBranchID(branch)
if err != nil {
return nil, err
}
repo, err := c.Catalog.CreateRepository(ctx, repositoryID, storageNS, branchID)
repo, err := c.EntryCatalog.CreateRepository(ctx, repositoryID, storageNS, branchID)
if err != nil {
return nil, err
}
Expand All @@ -50,11 +51,11 @@ func (c *cataloger) CreateRepository(ctx context.Context, repository string, sto

// GetRepository get repository information
func (c *cataloger) GetRepository(ctx context.Context, repository string) (*catalog.Repository, error) {
repositoryID, err := NewRepositoryID(repository)
repositoryID, err := graveler.NewRepositoryID(repository)
if err != nil {
return nil, err
}
repo, err := c.Catalog.GetRepository(ctx, repositoryID)
repo, err := c.EntryCatalog.GetRepository(ctx, repositoryID)
if err != nil {
return nil, err
}
Expand All @@ -69,11 +70,11 @@ func (c *cataloger) GetRepository(ctx context.Context, repository string) (*cata

// DeleteRepository delete a repository
func (c *cataloger) DeleteRepository(ctx context.Context, repository string) error {
repositoryID, err := NewRepositoryID(repository)
repositoryID, err := graveler.NewRepositoryID(repository)
if err != nil {
return err
}
return c.Catalog.DeleteRepository(ctx, repositoryID)
return c.EntryCatalog.DeleteRepository(ctx, repositoryID)
}

// ListRepositories list repositories information, the bool returned is true when more repositories can be listed.
Expand All @@ -83,23 +84,23 @@ func (c *cataloger) ListRepositories(ctx context.Context, limit int, after strin
}

func (c *cataloger) CreateBranch(ctx context.Context, repository string, branch string, sourceBranch string) (*catalog.CommitLog, error) {
repositoryID, err := NewRepositoryID(repository)
repositoryID, err := graveler.NewRepositoryID(repository)
if err != nil {
return nil, err
}
branchID, err := NewBranchID(branch)
branchID, err := graveler.NewBranchID(branch)
if err != nil {
return nil, err
}
sourceRef, err := NewRef(sourceBranch)
sourceRef, err := graveler.NewRef(sourceBranch)
if err != nil {
return nil, err
}
newBranch, err := c.Catalog.CreateBranch(ctx, repositoryID, branchID, sourceRef)
newBranch, err := c.EntryCatalog.CreateBranch(ctx, repositoryID, branchID, sourceRef)
if err != nil {
return nil, err
}
commit, err := c.Catalog.GetCommit(ctx, repositoryID, newBranch.CommitID)
commit, err := c.EntryCatalog.GetCommit(ctx, repositoryID, newBranch.CommitID)
if err != nil {
return nil, err
}
Expand All @@ -116,44 +117,44 @@ func (c *cataloger) CreateBranch(ctx context.Context, repository string, branch
}

func (c *cataloger) DeleteBranch(ctx context.Context, repository string, branch string) error {
repositoryID, err := NewRepositoryID(repository)
repositoryID, err := graveler.NewRepositoryID(repository)
if err != nil {
return err
}
branchID, err := NewBranchID(branch)
branchID, err := graveler.NewBranchID(branch)
if err != nil {
return err
}
return c.Catalog.DeleteBranch(ctx, repositoryID, branchID)
return c.EntryCatalog.DeleteBranch(ctx, repositoryID, branchID)
}

func (c *cataloger) ListBranches(ctx context.Context, repository string, prefix string, limit int, after string) ([]*catalog.Branch, bool, error) {
panic("not implemented") // TODO: Implement
}

func (c *cataloger) BranchExists(ctx context.Context, repository string, branch string) (bool, error) {
repositoryID, err := NewRepositoryID(repository)
repositoryID, err := graveler.NewRepositoryID(repository)
if err != nil {
return false, err
}
branchID, err := NewBranchID(branch)
branchID, err := graveler.NewBranchID(branch)
if err != nil {
return false, err
}
_, err = c.Catalog.GetBranch(ctx, repositoryID, branchID)
_, err = c.EntryCatalog.GetBranch(ctx, repositoryID, branchID)
return err != nil, err
}

func (c *cataloger) GetBranchReference(ctx context.Context, repository string, branch string) (string, error) {
repositoryID, err := NewRepositoryID(repository)
repositoryID, err := graveler.NewRepositoryID(repository)
if err != nil {
return "", err
}
branchID, err := NewBranchID(branch)
branchID, err := graveler.NewBranchID(branch)
if err != nil {
return "", err
}
b, err := c.Catalog.GetBranch(ctx, repositoryID, branchID)
b, err := c.EntryCatalog.GetBranch(ctx, repositoryID, branchID)
if err != nil {
return "", err
}
Expand All @@ -167,19 +168,19 @@ func (c *cataloger) ResetBranch(ctx context.Context, repository string, branch s
// GetEntry returns the current entry for path in repository branch reference. Returns
// the entry with ExpiredError if it has expired from underlying storage.
func (c *cataloger) GetEntry(ctx context.Context, repository string, reference string, path string, _ catalog.GetEntryParams) (*catalog.Entry, error) {
repositoryID, err := NewRepositoryID(repository)
repositoryID, err := graveler.NewRepositoryID(repository)
if err != nil {
return nil, err
}
ref, err := NewRef(reference)
ref, err := graveler.NewRef(reference)
if err != nil {
return nil, err
}
p, err := NewPath(path)
if err != nil {
return nil, err
}
ent, err := c.Catalog.GetEntry(ctx, repositoryID, ref, p)
ent, err := c.EntryCatalog.GetEntry(ctx, repositoryID, ref, p)
if err != nil {
return nil, err
}
Expand All @@ -195,33 +196,33 @@ func (c *cataloger) GetEntry(ctx context.Context, repository string, reference s
}

func (c *cataloger) CreateEntry(ctx context.Context, repository string, branch string, entry catalog.Entry, _ catalog.CreateEntryParams) error {
repositoryID, err := NewRepositoryID(repository)
repositoryID, err := graveler.NewRepositoryID(repository)
if err != nil {
return err
}
branchID, err := NewBranchID(branch)
branchID, err := graveler.NewBranchID(branch)
if err != nil {
return err
}
p, err := NewPath(entry.Path)
if err != nil {
return err
}
ent := Entry{
ent := &Entry{
Address: entry.PhysicalAddress,
Metadata: map[string]string(entry.Metadata),
ETag: entry.Checksum,
Size: entry.Size,
}
return c.Catalog.SetEntry(ctx, repositoryID, branchID, p, ent)
return c.EntryCatalog.SetEntry(ctx, repositoryID, branchID, p, ent)
}

func (c *cataloger) CreateEntries(ctx context.Context, repository string, branch string, entries []catalog.Entry) error {
repositoryID, err := NewRepositoryID(repository)
repositoryID, err := graveler.NewRepositoryID(repository)
if err != nil {
return err
}
branchID, err := NewBranchID(branch)
branchID, err := graveler.NewBranchID(branch)
if err != nil {
return err
}
Expand All @@ -230,33 +231,33 @@ func (c *cataloger) CreateEntries(ctx context.Context, repository string, branch
if err != nil {
return err
}
ent := Entry{
ent := &Entry{
Address: entry.PhysicalAddress,
Metadata: map[string]string(entry.Metadata),
ETag: entry.Checksum,
Size: entry.Size,
}
if err := c.Catalog.SetEntry(ctx, repositoryID, branchID, p, ent); err != nil {
if err := c.EntryCatalog.SetEntry(ctx, repositoryID, branchID, p, ent); err != nil {
return err
}
}
return nil
}

func (c *cataloger) DeleteEntry(ctx context.Context, repository string, branch string, path string) error {
repositoryID, err := NewRepositoryID(repository)
repositoryID, err := graveler.NewRepositoryID(repository)
if err != nil {
return err
}
branchID, err := NewBranchID(branch)
branchID, err := graveler.NewBranchID(branch)
if err != nil {
return err
}
p, err := NewPath(path)
if err != nil {
return err
}
return c.Catalog.DeleteEntry(ctx, repositoryID, branchID, p)
return c.EntryCatalog.DeleteEntry(ctx, repositoryID, branchID, p)
}

func (c *cataloger) ListEntries(ctx context.Context, repository string, reference string, prefix string, after string, delimiter string, limit int) ([]*catalog.Entry, bool, error) {
Expand Down Expand Up @@ -306,15 +307,15 @@ func (c *cataloger) DedupReportChannel() chan *catalog.DedupReport {
}

func (c *cataloger) Commit(ctx context.Context, repository string, branch string, message string, committer string, metadata catalog.Metadata) (*catalog.CommitLog, error) {
repositoryID, err := NewRepositoryID(repository)
repositoryID, err := graveler.NewRepositoryID(repository)
if err != nil {
return nil, err
}
branchID, err := NewBranchID(branch)
branchID, err := graveler.NewBranchID(branch)
if err != nil {
return nil, err
}
commitID, err := c.Catalog.Commit(ctx, repositoryID, branchID, committer, message, map[string]string(metadata))
commitID, err := c.EntryCatalog.Commit(ctx, repositoryID, branchID, committer, message, map[string]string(metadata))
if err != nil {
return nil, err
}
Expand All @@ -325,9 +326,9 @@ func (c *cataloger) Commit(ctx context.Context, repository string, branch string
Metadata: metadata,
}
// in order to return commit log we need the commit creation time and parents
commit, err := c.Catalog.GetCommit(ctx, repositoryID, commitID)
commit, err := c.EntryCatalog.GetCommit(ctx, repositoryID, commitID)
if err != nil {
return catalogCommitLog, ErrCommitNotFound
return catalogCommitLog, graveler.ErrCommitNotFound
}
for _, parent := range commit.Parents {
catalogCommitLog.Parents = append(catalogCommitLog.Parents, parent.String())
Expand All @@ -337,19 +338,19 @@ func (c *cataloger) Commit(ctx context.Context, repository string, branch string
}

func (c *cataloger) GetCommit(ctx context.Context, repository string, reference string) (*catalog.CommitLog, error) {
repositoryID, err := NewRepositoryID(repository)
repositoryID, err := graveler.NewRepositoryID(repository)
if err != nil {
return nil, err
}
ref, err := NewRef(reference)
ref, err := graveler.NewRef(reference)
if err != nil {
return nil, err
}
commitID, err := c.Catalog.Dereference(ctx, repositoryID, ref)
commitID, err := c.EntryCatalog.Dereference(ctx, repositoryID, ref)
if err != nil {
return nil, err
}
commit, err := c.Catalog.GetCommit(ctx, repositoryID, commitID)
commit, err := c.EntryCatalog.GetCommit(ctx, repositoryID, commitID)
if err != nil {
return nil, err
}
Expand Down
65 changes: 65 additions & 0 deletions catalog/rocks/entry_catalog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package rocks

import (
"context"
"time"

"github.com/treeverse/lakefs/graveler"
)

type Path string

// Entry represents metadata or a given object (modified date, physical address, etc)
type Entry struct {
LastModified time.Time
Address string
Metadata graveler.Metadata
ETag string
Size int64
}

type EntryRecord struct {
Path Path
*Entry
}

type EntryListing struct {
CommonPrefix bool
Path
*Entry
}

type EntryListingIterator interface {
Next() bool
SeekGE(id Path) bool
Value() *EntryListing
Err() error
Close()
}

// EntryCatalog
type EntryCatalog interface {
graveler.VersionController

// Get returns entry from repository / reference by path, nil entry is a valid entry for tombstone
// returns error if entry does not exist
GetEntry(ctx context.Context, repositoryID graveler.RepositoryID, ref graveler.Ref, path Path) (*Entry, error)

// Set stores entry on repository / branch by path. nil entry is a valid entry for tombstone
SetEntry(ctx context.Context, repositoryID graveler.RepositoryID, branchID graveler.BranchID, path Path, entry *Entry) error

// DeleteEntry deletes entry from repository / branch by path
DeleteEntry(ctx context.Context, repositoryID graveler.RepositoryID, branchID graveler.BranchID, path Path) error

// List lists entries on repository / ref will filter by prefix, from path 'from'.
// When 'delimiter' is set the listing will include common prefixes based on the delimiter
ListEntries(ctx context.Context, repositoryID graveler.RepositoryID, ref graveler.Ref, prefix, from, delimiter Path) (EntryListingIterator, error)
}

func NewPath(id string) (Path, error) {
return Path(id), nil
}

func (id Path) String() string {
return string(id)
}
Loading

0 comments on commit b1b3858

Please sign in to comment.