Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate Deps and Event in hook triggering #1463

Merged
merged 1 commit into from
Feb 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions actions/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ const (
EventTypePreMerge EventType = "pre-merge"
)

type Deps struct {
Source Source
Output OutputWriter
}

type Event struct {
EventID uuid.UUID
Source Source
Output OutputWriter
EventType EventType
EventTime time.Time
RepositoryID string
Expand Down
2 changes: 1 addition & 1 deletion actions/hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const (

// Hook is the abstraction of the basic user-configured runnable building-stone
type Hook interface {
Run(ctx context.Context, event Event) error
Run(ctx context.Context, event Event, writer *HookOutputWriter) error
}

type NewHookFunc func(ActionHook, *Action) (Hook, error)
Expand Down
10 changes: 5 additions & 5 deletions actions/hook_output_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ type HookOutputWriter struct {
Writer OutputWriter
}

func (h *HookOutputWriter) OutputWrite(ctx context.Context, name string, reader io.Reader) error {
outputPath := FormatHookOutputPath(h.RunID, h.ActionName, h.HookID, name)
return h.Writer.OutputWrite(ctx, outputPath, reader)
func (h *HookOutputWriter) OutputWrite(ctx context.Context, reader io.Reader, size int64) error {
outputPath := FormatHookOutputPath(h.RunID, h.ActionName, h.HookID)
return h.Writer.OutputWrite(ctx, outputPath, reader, size)
}

func FormatHookOutputPath(runID, actionName, hookID, name string) string {
return path.Join(runID, actionName, hookID+"_"+name)
func FormatHookOutputPath(runID, actionName, hookID string) string {
return path.Join(runID, actionName, hookID)
}
13 changes: 7 additions & 6 deletions actions/hook_output_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,22 @@ import (

func TestHookWriter_OutputWritePath(t *testing.T) {
ctx := context.Background()
contentReader := strings.NewReader("content")
content := "content"
contentReader := strings.NewReader(content)
ctrl := gomock.NewController(t)
defer ctrl.Finish()

hookOutput := actions.FormatHookOutputPath("runID", "actionName", "hookID", "name1")
hookOutput := actions.FormatHookOutputPath("runID", "actionName", "hookID")
writer := mock.NewMockOutputWriter(ctrl)
writer.EXPECT().OutputWrite(ctx, hookOutput, contentReader).Return(nil)
writer.EXPECT().OutputWrite(ctx, hookOutput, contentReader, int64(len(content))).Return(nil)

w := &actions.HookOutputWriter{
RunID: "runID",
ActionName: "actionName",
HookID: "hookID",
Writer: writer,
}
err := w.OutputWrite(ctx, "name1", contentReader)
err := w.OutputWrite(ctx, contentReader, int64(len(content)))
if err != nil {
t.Fatalf("OutputWrite failed with err=%s", err)
}
Expand All @@ -39,7 +40,7 @@ func TestHookWriter_OutputWriteError(t *testing.T) {

errSomeError := errors.New("some error")
writer := mock.NewMockOutputWriter(ctrl)
writer.EXPECT().OutputWrite(gomock.Any(), gomock.Any(), gomock.Any()).Return(errSomeError)
writer.EXPECT().OutputWrite(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(errSomeError)

w := &actions.HookOutputWriter{
RunID: "runID",
Expand All @@ -49,7 +50,7 @@ func TestHookWriter_OutputWriteError(t *testing.T) {
}
ctx := context.Background()
contentReader := strings.NewReader("content")
err := w.OutputWrite(ctx, "name1", contentReader)
err := w.OutputWrite(ctx, contentReader, 10)
if !errors.Is(err, errSomeError) {
t.Fatalf("OutputWrite() err=%v expected=%v", err, errSomeError)
}
Expand Down
2 changes: 1 addition & 1 deletion actions/output_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ import (
)

type OutputWriter interface {
OutputWrite(ctx context.Context, name string, reader io.Reader) error
OutputWrite(ctx context.Context, name string, reader io.Reader, size int64) error
}
17 changes: 7 additions & 10 deletions actions/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ func New(db db.Database) *Service {
}
}

func (s *Service) Run(ctx context.Context, event Event) error {
func (s *Service) Run(ctx context.Context, event Event, deps Deps) error {
// load relevant actions
actions, err := s.loadMatchedActions(ctx, event.Source, MatchSpec{EventType: event.EventType, Branch: event.BranchID})
actions, err := s.loadMatchedActions(ctx, deps.Source, MatchSpec{EventType: event.EventType, Branch: event.BranchID})
if err != nil || len(actions) == 0 {
return err
}
Expand All @@ -41,7 +41,7 @@ func (s *Service) Run(ctx context.Context, event Event) error {
if err != nil {
return nil
}
return s.runTasks(ctx, tasks, event)
return s.runTasks(ctx, tasks, event, deps)
}

func (s *Service) loadMatchedActions(ctx context.Context, source Source, spec MatchSpec) ([]*Action, error) {
Expand Down Expand Up @@ -74,20 +74,17 @@ func (s *Service) allocateTasks(runID string, actions []*Action) ([]*Task, error
return tasks, nil
}

func (s *Service) runTasks(ctx context.Context, hooks []*Task, event Event) error {
func (s *Service) runTasks(ctx context.Context, hooks []*Task, event Event, deps Deps) error {
var g multierror.Group
for _, h := range hooks {
hh := h // pinning
g.Go(func() error {
// set hook's event to have scoped writer
hookEvent := event
hookEvent.Output = &HookOutputWriter{
return hh.Hook.Run(ctx, event, &HookOutputWriter{
RunID: hh.RunID,
ActionName: hh.Action.Name,
HookID: hh.HookID,
Writer: event.Output,
}
return hh.Hook.Run(ctx, hookEvent)
Writer: deps.Output,
})
})
}
return g.Wait().ErrorOrNil()
Expand Down
10 changes: 6 additions & 4 deletions actions/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestManager_RunActions(t *testing.T) {

ctx := context.Background()
testOutputWriter := mock.NewMockOutputWriter(ctrl)
testOutputWriter.EXPECT().OutputWrite(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
testOutputWriter.EXPECT().OutputWrite(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)

testSource := mock.NewMockSource(ctrl)
testSource.EXPECT().List(gomock.Any()).Return([]actions.FileRef{{Path: "act.yaml", Address: "act.addr"}}, nil)
Expand All @@ -41,8 +41,6 @@ hooks:

evtTime := time.Now()
evt := actions.Event{
Source: testSource,
Output: testOutputWriter,
EventType: actions.EventTypePreCommit,
EventTime: evtTime,
RepositoryID: "repoID",
Expand All @@ -52,8 +50,12 @@ hooks:
Committer: "committer",
Metadata: map[string]string{"key": "value"},
}
deps := actions.Deps{
Source: testSource,
Output: testOutputWriter,
}
actionsManager := actions.New(nil)
err := actionsManager.Run(ctx, evt)
err := actionsManager.Run(ctx, evt, deps)
if err != nil {
t.Fatalf("Run() failed with err=%s", err)
}
Expand Down
4 changes: 2 additions & 2 deletions actions/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func NewWebhook(h ActionHook, action *Action) (Hook, error) {
}, nil
}

func (w *Webhook) Run(ctx context.Context, event Event) error {
func (w *Webhook) Run(ctx context.Context, event Event, writer *HookOutputWriter) error {
// post event information as json to webhook endpoint
eventData, err := w.marshalEventInformation(event)
if err != nil {
Expand All @@ -85,7 +85,7 @@ func (w *Webhook) Run(ctx context.Context, event Event) error {

// log response body if needed
if resp.Body != nil && resp.ContentLength > 0 {
if err := event.Output.OutputWrite(ctx, w.ID, resp.Body); err != nil {
if err := writer.OutputWrite(ctx, resp.Body, resp.ContentLength); err != nil {
return err
}
}
Expand Down
13 changes: 13 additions & 0 deletions catalog/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package catalog
import (
"context"
"fmt"
"io"
"io/ioutil"

"github.com/treeverse/lakefs/actions"
Expand Down Expand Up @@ -59,3 +60,15 @@ func (as *actionsSource) Load(ctx context.Context, fileRef actions.FileRef) ([]b
}
return bytes, nil
}

type actionsWriter struct {
adapter block.Adapter
storageNamespace graveler.StorageNamespace
}

func (aw *actionsWriter) OutputWrite(ctx context.Context, path string, reader io.Reader, size int64) error {
return aw.adapter.WithContext(ctx).Put(block.ObjectPointer{
StorageNamespace: aw.storageNamespace.String(),
Identifier: path,
}, size, reader, block.PutOpts{})
}
58 changes: 35 additions & 23 deletions catalog/entry_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ type Store interface {
graveler.Loader
}

type ActionsClient interface {
Run(ctx context.Context, event actions.Event) error
type Actions interface {
Run(ctx context.Context, event actions.Event, deps actions.Deps) error
}

type EntryCatalog struct {
Actions ActionsClient
Actions Actions
BlockAdapter block.Adapter
Store Store
}
Expand All @@ -101,7 +101,7 @@ type Config struct {
Config *config.Config
DB db.Database
LockDB db.Database
Actions ActionsClient
Actions Actions
}

func NewEntryCatalog(cfg Config) (*EntryCatalog, error) {
Expand Down Expand Up @@ -553,23 +553,29 @@ func (e *EntryCatalog) LoadTags(ctx context.Context, repositoryID graveler.Repos

func (e *EntryCatalog) PreCommitHook(ctx context.Context, eventID uuid.UUID, repositoryRecord graveler.RepositoryRecord, branch graveler.BranchID, commit graveler.Commit) error {
evt := actions.Event{
EventType: actions.EventTypePreCommit,
EventID: eventID,
EventTime: time.Now(),
EventType: actions.EventTypePreCommit,
EventID: eventID,
EventTime: time.Now(),
RepositoryID: repositoryRecord.RepositoryID.String(),
BranchID: branch.String(),
CommitMessage: commit.Message,
Committer: commit.Committer,
Metadata: commit.Metadata,
}
deps := actions.Deps{
Source: &actionsSource{
catalog: e,
adapter: e.BlockAdapter,
repositoryID: repositoryRecord.RepositoryID,
storageNamespace: repositoryRecord.StorageNamespace,
ref: branch.Ref(),
},
RepositoryID: repositoryRecord.RepositoryID.String(),
BranchID: branch.String(),
CommitMessage: commit.Message,
Committer: commit.Committer,
Metadata: commit.Metadata,
Output: &actionsWriter{
adapter: e.BlockAdapter,
storageNamespace: repositoryRecord.StorageNamespace,
},
}
return e.Actions.Run(ctx, evt)
return e.Actions.Run(ctx, evt, deps)
}

func (e *EntryCatalog) PostCommitHook(ctx context.Context, eventID uuid.UUID, repositoryRecord graveler.RepositoryRecord, branch graveler.BranchID, commitRecord graveler.CommitRecord) error {
Expand All @@ -578,24 +584,30 @@ func (e *EntryCatalog) PostCommitHook(ctx context.Context, eventID uuid.UUID, re

func (e *EntryCatalog) PreMergeHook(ctx context.Context, eventID uuid.UUID, repositoryRecord graveler.RepositoryRecord, destination graveler.BranchID, source graveler.Ref, commit graveler.Commit) error {
evt := actions.Event{
EventType: actions.EventTypePreMerge,
EventID: eventID,
EventTime: time.Now(),
EventType: actions.EventTypePreMerge,
EventID: eventID,
EventTime: time.Now(),
RepositoryID: repositoryRecord.RepositoryID.String(),
BranchID: destination.String(),
SourceRef: source.String(),
CommitMessage: commit.Message,
Committer: commit.Committer,
Metadata: commit.Metadata,
}
deps := actions.Deps{
Source: &actionsSource{
catalog: e,
adapter: e.BlockAdapter,
repositoryID: repositoryRecord.RepositoryID,
storageNamespace: repositoryRecord.StorageNamespace,
ref: source,
},
RepositoryID: repositoryRecord.RepositoryID.String(),
BranchID: destination.String(),
SourceRef: source.String(),
CommitMessage: commit.Message,
Committer: commit.Committer,
Metadata: commit.Metadata,
Output: &actionsWriter{
adapter: e.BlockAdapter,
storageNamespace: repositoryRecord.StorageNamespace,
},
}
return e.Actions.Run(ctx, evt)
return e.Actions.Run(ctx, evt, deps)
}

func (e *EntryCatalog) PostMergeHook(ctx context.Context, eventID uuid.UUID, repositoryRecord graveler.RepositoryRecord, destination graveler.BranchID, source graveler.Ref, commitRecord graveler.CommitRecord) error {
Expand Down