Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cataloger rocks initial connect to catalog #976

Merged
merged 2 commits into from
Nov 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions catalog/rocks/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,14 @@ package rocks

import (
"context"
"encoding/hex"
"errors"
"fmt"
"regexp"
"strings"
"time"

"github.com/treeverse/lakefs/uri"
)

// Basic Types
Expand Down Expand Up @@ -86,6 +93,7 @@ type Entry struct {
Address string
Metadata Metadata
ETag string
Size int64
}

// EntryRecord holds Path with the associated Entry information
Expand Down Expand Up @@ -137,6 +145,19 @@ type Diff struct {

// Interfaces
type Catalog interface {
// GetRepository returns the Repository metadata object for the given RepositoryID
GetRepository(ctx context.Context, repositoryID RepositoryID) (*Repository, error)

// CreateRepository stores a new Repository under RepositoryID with the given Branch as default branch
CreateRepository(ctx context.Context, repositoryID RepositoryID, stoargeNamespace StorageNamespace, branchID BranchID) (*Repository, error)

// ListRepositories lists repositories starting with 'from' returns maximum 'amount' results and true (boolean)
// in case more results exist
ListRepositories(ctx context.Context, from RepositoryID, amount int) ([]RepositoryRecord, bool, error)

// DeleteRepository deletes the repository
DeleteRepository(ctx context.Context, repositoryID RepositoryID) error

// GetEntry returns entry from repository / reference by path, nil entry is a valid value for tombstone
// returns error if entry does not exist
GetEntry(ctx context.Context, repositoryID RepositoryID, ref Ref, path Path) (*Entry, error)
Expand Down Expand Up @@ -181,6 +202,12 @@ type Catalog interface {
// ErrNothingToCommit in case there is no data in stage
Commit(ctx context.Context, repositoryID RepositoryID, branchID BranchID, committer string, message string, metadata Metadata) (CommitID, error)

// GetCommit returns the Commit metadata object for the given CommitID
GetCommit(ctx context.Context, repositoryID RepositoryID, commitID CommitID) (*Commit, error)

// Dereference returns the commit ID based on 'ref' reference
Dereference(ctx context.Context, repositoryID RepositoryID, ref Ref) (CommitID, error)

// Reset throw all staged data on the repository / branch
Reset(ctx context.Context, repositoryID RepositoryID, branchID BranchID) error

Expand Down Expand Up @@ -348,3 +375,85 @@ type StagingManager interface {
// ListSnapshot returns an iterator to scan the snapshot entries
ListSnapshot(ctx context.Context, repositoryID RepositoryID, branchID BranchID, st StagingToken, from Path) (EntryIterator, error)
}

var (
reValidBranchID = regexp.MustCompile(`^\w[-\w]*$`)
reValidRepositoryID = regexp.MustCompile(`^[a-z0-9][a-z0-9-]{2,62}$`)
)

// Catalog errors
var (
ErrNotFound = errors.New("not found")
ErrInvalidValue = errors.New("invalid value")
ErrInvalidStorageNamespace = fmt.Errorf("storage namespace %w", ErrInvalidValue)
ErrInvalidRepositoryID = fmt.Errorf("repository id %w", ErrInvalidValue)
ErrInvalidBranchID = fmt.Errorf("branch id %w", ErrInvalidValue)
ErrInvalidRef = fmt.Errorf("ref %w", ErrInvalidValue)
ErrInvalidCommitID = fmt.Errorf("commit id %w", ErrInvalidValue)
ErrCommitNotFound = fmt.Errorf("commit %w", ErrNotFound)
)

func NewRepositoryID(id string) (RepositoryID, error) {
if !reValidRepositoryID.MatchString(id) {
return "", ErrInvalidRepositoryID
}
return RepositoryID(id), nil
}

func (id RepositoryID) String() string {
return string(id)
}

func NewStorageNamespace(ns string) (StorageNamespace, error) {
u, err := uri.Parse(ns)
if err != nil || u.Protocol == "" {
return "", ErrInvalidStorageNamespace
}
return StorageNamespace(ns), nil
}

func (ns StorageNamespace) String() string {
return string(ns)
}

func NewBranchID(id string) (BranchID, error) {
if !reValidBranchID.MatchString(id) {
return "", ErrInvalidBranchID
}
return BranchID(id), nil
}

func (id BranchID) String() string {
return string(id)
}

func NewRef(id string) (Ref, error) {
if id == "" || strings.ContainsAny(id, " \t\r\n") {
return "", ErrInvalidRef
}
return Ref(id), nil
}

func (id Ref) String() string {
return string(id)
}

func NewPath(id string) (Path, error) {
return Path(id), nil
}

func (id Path) String() string {
return string(id)
}

func NewCommitID(id string) (CommitID, error) {
_, err := hex.DecodeString(id)
if err != nil {
return "", ErrInvalidCommitID
}
return CommitID(id), nil
}

func (id CommitID) String() string {
return string(id)
}
Loading