Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Find expired and active commits #2069

Merged
merged 31 commits into from
Jun 21, 2021
Merged

Find expired and active commits #2069

merged 31 commits into from
Jun 21, 2021

Conversation

johnnyaug
Copy link
Contributor

@johnnyaug johnnyaug commented Jun 6, 2021

Closes #2067

  1. API endpoint to set/get GC rules for a repository. Rules are saved in S3 under under _lakefs/retention/rules.
  2. API endpoint to prepare a set of active/expired commits according to the GC rules. The result is saved as a CSV file under _lakefs/retention/commits.

@johnnyaug johnnyaug changed the title Feature/find active commits Find expired and active commits Jun 15, 2021
@johnnyaug johnnyaug marked this pull request as ready for review June 15, 2021 14:41
@johnnyaug johnnyaug requested review from nopcoder and guy-har June 15, 2021 14:41
api/swagger.yml Outdated
GarbageCollectionPrepareRequest:
type: object
properties:
previous_result_path:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add an example? Is it an object-store path or a lakeFS one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this to run-id and added example

GarbageCollectionRules:
type: object
properties:
default_retention_days:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the relation between a rule's retention_days and the global default_retention_days?

tags:
- retention
operationId: getGarbageCollectionRules
responses:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing 404 for missing repo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

application/json:
schema:
$ref: "#/components/schemas/GarbageCollectionRules"
responses:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing 404.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

func NewRuleManager(blockAdapter block.Adapter, blockStoragePrefix string) *RuleManager {
return &RuleManager{blockAdapter: blockAdapter, configurationFileSuffix: fmt.Sprintf("/%s/retention/rules/config.json", blockStoragePrefix)}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set retention/rules/config.json as const.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -0,0 +1,214 @@
package ref
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great tests!

Comment on lines 1217 to 1219
for _, commitRow := range previousCommits {
previouslyExpiredCommits = append(previouslyExpiredCommits, graveler.CommitID(commitRow[1]))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are not all expired, are they?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, fixing

return c.Store.SetRetentionRules(ctx, graveler.RepositoryID(repositoryID), rules)
}

func (c *Catalog) PrepareExpiredCommits(ctx context.Context, repository string, previousResultPath string) (string, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe extract the csv parsing to a designated parser?
It should also handle the schema of this file which is currently static.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return "", err
}
}
csvWriter.Flush()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you also close it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing to close here: we are writing to a strings.Builder

csvWriter.Flush()
commitsStr := b.String()
runID := uuid.New().String()
path := fmt.Sprintf("_lakefs/retention/commits/run_id=%s/commits.csv", runID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_lakefs prefix should be passed to you from the config.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add the /retention/commits/ part too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done the first one

Copy link
Contributor

@guy-har guy-har left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks Great!!
The code look great, liked the tests.

I have a few concerns about the API paths, the GarbageCollectionPrepareRequest contains the full path, I think it should only contain the runID. the prepare path could be a bit confusing. There are places the name garbage collection is used and other where the name retention is used, which is OK, but got me thinking that, when working on lifecycle we might decide to change the API (or align it somehow), at that point it will be a breaking change. I suggest we leave the API for setting and getting configurations for later, what do you think?

api/swagger.yml Outdated
Comment on lines 823 to 825
previous_result_path:
type: string
description: path to the result of a previous successful GC job
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't run ID enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

api/swagger.yml Outdated
Comment on lines 829 to 833
properties:
path:
type: string
description: path to a dataset of commits

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

api/swagger.yml Outdated


/repositories/{repository}/gc/prepare:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change path to be more specific
prepare could also mean prepare the expired addresses data

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 373 to 377

GetRetentionRules(ctx context.Context, repositoryID RepositoryID) (*RetentionRules, error)

SetRetentionRules(ctx context.Context, repositoryID RepositoryID, rules *RetentionRules) error

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this retention rules or garbage collection rules?
In the future we will also have lifecycle rules.
Not sure if it will be together but it could get a bit confusing.

Maybe we should give it another thought before adding the API routes...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to garbage collection rules

@@ -856,6 +879,32 @@ func (g *Graveler) GetStagingToken(ctx context.Context, repositoryID RepositoryI
return &branch.StagingToken, nil
}

func (g *Graveler) GetRetentionRules(ctx context.Context, repositoryID RepositoryID) (*RetentionRules, error) {
// TODO use "_lakefs" from configuration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still relevant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

}

func (g *Graveler) SetRetentionRules(ctx context.Context, repositoryID RepositoryID, rules *RetentionRules) error {
// TODO use "_lakefs" from configuration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still relevant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

csvWriter.Flush()
commitsStr := b.String()
runID := uuid.New().String()
path := fmt.Sprintf("_lakefs/retention/commits/run_id=%s/commits.csv", runID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use hardcoded path, get _lakefs from the configurations. maybe even use a function to get the path

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

func (c *Controller) PrepareGarbageCollectionCommits(w http.ResponseWriter, r *http.Request, body PrepareGarbageCollectionCommitsJSONRequestBody, repository string) {
if !c.authorize(w, r, []permissions.Permission{
{
Action: permissions.ListObjectsAction,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't be same as log commits ?
ReadBranchAction

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I need this permission for all branches, and not just one, I'm adding a dedicated permission for this: retention:PrepareGarbageCollectionCommits

Copy link
Contributor

@nopcoder nopcoder left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is that prepare operation can take time to perform and can cause the operation to timeout - do we have numbers on how long it can take to process the request?

api/swagger.yml Outdated
Comment on lines 827 to 832
GarbageCollectionCommits:
type: object
properties:
path:
type: string
description: path to a dataset of commits
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add the type/format of the collection as a parameter or in the description?

api/swagger.yml Show resolved Hide resolved
api/swagger.yml Show resolved Hide resolved
Comment on lines 61 to 62
GetGarbageCollectionRules = "retention:GetGarbageCollectionRules"
SetGarbageCollectionRules = "retention:SetGarbageCollectionRules"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't forget to add documentation https://docs.lakefs.io/reference/authorization.html - can be different PR as we can merge this one before the release

func (c *Controller) SetGarbageCollectionRules(w http.ResponseWriter, r *http.Request, body SetGarbageCollectionRulesJSONRequestBody, repository string) {
if !c.authorize(w, r, []permissions.Permission{
{
Action: permissions.GetGarbageCollectionRules,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SetGarbageCollectionRules

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return "", err
}
csvReader := csv.NewReader(previousRunReader)
previousCommits, err := csvReader.ReadAll()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer iteration as we don't keep the array of record and just append them into our own structure

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done (and moved to another place)

return "", err
}
}
csvWriter.Flush()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to call Error() after this call to capture possible errors

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

b := &strings.Builder{}
csvWriter := csv.NewWriter(b)
for _, commitID := range expiredCommits {
err = csvWriter.Write([]string{string(commitID), strconv.FormatBool(true)})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use "true" or capture the value outside the loops

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
}
for _, commitID := range activeCommits {
err = csvWriter.Write([]string{string(commitID), strconv.FormatBool(false)})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use "false" or capture the value outside the loops

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if err != nil {
return "", err
}
previouslyExpiredCommits := make([]graveler.CommitID, 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need zero allocated slice for this one

Suggested change
previouslyExpiredCommits := make([]graveler.CommitID, 0)
var previouslyExpiredCommits []graveler.CommitID

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done (and moved to another place)

Copy link
Contributor

@nopcoder nopcoder left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just remove the user a var for an empty struct - can just pass it

"github.com/treeverse/lakefs/pkg/graveler"
)

var empty struct{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for this one

@johnnyaug johnnyaug requested a review from nopcoder June 20, 2021 12:52
@johnnyaug johnnyaug merged commit af759c6 into master Jun 21, 2021
@johnnyaug johnnyaug deleted the feature/find_active_commits branch June 21, 2021 07:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[GC] Logic for finding the set of active/expired commits
4 participants