Skip to content

Commit

Permalink
Separate Deps and Event in hook triggering (#1463)
Browse files Browse the repository at this point in the history
  • Loading branch information
itaiad200 authored Feb 15, 2021
1 parent 611d9c0 commit 7d63bc5
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 54 deletions.
7 changes: 5 additions & 2 deletions actions/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ const (
EventTypePreMerge EventType = "pre-merge"
)

type Deps struct {
Source Source
Output OutputWriter
}

type Event struct {
EventID uuid.UUID
Source Source
Output OutputWriter
EventType EventType
EventTime time.Time
RepositoryID string
Expand Down
2 changes: 1 addition & 1 deletion actions/hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const (

// Hook is the abstraction of the basic user-configured runnable building-stone
type Hook interface {
Run(ctx context.Context, event Event) error
Run(ctx context.Context, event Event, writer *HookOutputWriter) error
}

type NewHookFunc func(ActionHook, *Action) (Hook, error)
Expand Down
10 changes: 5 additions & 5 deletions actions/hook_output_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ type HookOutputWriter struct {
Writer OutputWriter
}

func (h *HookOutputWriter) OutputWrite(ctx context.Context, name string, reader io.Reader) error {
outputPath := FormatHookOutputPath(h.RunID, h.ActionName, h.HookID, name)
return h.Writer.OutputWrite(ctx, outputPath, reader)
func (h *HookOutputWriter) OutputWrite(ctx context.Context, reader io.Reader, size int64) error {
outputPath := FormatHookOutputPath(h.RunID, h.ActionName, h.HookID)
return h.Writer.OutputWrite(ctx, outputPath, reader, size)
}

func FormatHookOutputPath(runID, actionName, hookID, name string) string {
return path.Join(runID, actionName, hookID+"_"+name)
func FormatHookOutputPath(runID, actionName, hookID string) string {
return path.Join(runID, actionName, hookID)
}
13 changes: 7 additions & 6 deletions actions/hook_output_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,22 @@ import (

func TestHookWriter_OutputWritePath(t *testing.T) {
ctx := context.Background()
contentReader := strings.NewReader("content")
content := "content"
contentReader := strings.NewReader(content)
ctrl := gomock.NewController(t)
defer ctrl.Finish()

hookOutput := actions.FormatHookOutputPath("runID", "actionName", "hookID", "name1")
hookOutput := actions.FormatHookOutputPath("runID", "actionName", "hookID")
writer := mock.NewMockOutputWriter(ctrl)
writer.EXPECT().OutputWrite(ctx, hookOutput, contentReader).Return(nil)
writer.EXPECT().OutputWrite(ctx, hookOutput, contentReader, int64(len(content))).Return(nil)

w := &actions.HookOutputWriter{
RunID: "runID",
ActionName: "actionName",
HookID: "hookID",
Writer: writer,
}
err := w.OutputWrite(ctx, "name1", contentReader)
err := w.OutputWrite(ctx, contentReader, int64(len(content)))
if err != nil {
t.Fatalf("OutputWrite failed with err=%s", err)
}
Expand All @@ -39,7 +40,7 @@ func TestHookWriter_OutputWriteError(t *testing.T) {

errSomeError := errors.New("some error")
writer := mock.NewMockOutputWriter(ctrl)
writer.EXPECT().OutputWrite(gomock.Any(), gomock.Any(), gomock.Any()).Return(errSomeError)
writer.EXPECT().OutputWrite(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(errSomeError)

w := &actions.HookOutputWriter{
RunID: "runID",
Expand All @@ -49,7 +50,7 @@ func TestHookWriter_OutputWriteError(t *testing.T) {
}
ctx := context.Background()
contentReader := strings.NewReader("content")
err := w.OutputWrite(ctx, "name1", contentReader)
err := w.OutputWrite(ctx, contentReader, 10)
if !errors.Is(err, errSomeError) {
t.Fatalf("OutputWrite() err=%v expected=%v", err, errSomeError)
}
Expand Down
2 changes: 1 addition & 1 deletion actions/output_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ import (
)

type OutputWriter interface {
OutputWrite(ctx context.Context, name string, reader io.Reader) error
OutputWrite(ctx context.Context, name string, reader io.Reader, size int64) error
}
17 changes: 7 additions & 10 deletions actions/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ func New(db db.Database) *Service {
}
}

func (s *Service) Run(ctx context.Context, event Event) error {
func (s *Service) Run(ctx context.Context, event Event, deps Deps) error {
// load relevant actions
actions, err := s.loadMatchedActions(ctx, event.Source, MatchSpec{EventType: event.EventType, Branch: event.BranchID})
actions, err := s.loadMatchedActions(ctx, deps.Source, MatchSpec{EventType: event.EventType, Branch: event.BranchID})
if err != nil || len(actions) == 0 {
return err
}
Expand All @@ -41,7 +41,7 @@ func (s *Service) Run(ctx context.Context, event Event) error {
if err != nil {
return nil
}
return s.runTasks(ctx, tasks, event)
return s.runTasks(ctx, tasks, event, deps)
}

func (s *Service) loadMatchedActions(ctx context.Context, source Source, spec MatchSpec) ([]*Action, error) {
Expand Down Expand Up @@ -74,20 +74,17 @@ func (s *Service) allocateTasks(runID string, actions []*Action) ([]*Task, error
return tasks, nil
}

func (s *Service) runTasks(ctx context.Context, hooks []*Task, event Event) error {
func (s *Service) runTasks(ctx context.Context, hooks []*Task, event Event, deps Deps) error {
var g multierror.Group
for _, h := range hooks {
hh := h // pinning
g.Go(func() error {
// set hook's event to have scoped writer
hookEvent := event
hookEvent.Output = &HookOutputWriter{
return hh.Hook.Run(ctx, event, &HookOutputWriter{
RunID: hh.RunID,
ActionName: hh.Action.Name,
HookID: hh.HookID,
Writer: event.Output,
}
return hh.Hook.Run(ctx, hookEvent)
Writer: deps.Output,
})
})
}
return g.Wait().ErrorOrNil()
Expand Down
10 changes: 6 additions & 4 deletions actions/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestManager_RunActions(t *testing.T) {

ctx := context.Background()
testOutputWriter := mock.NewMockOutputWriter(ctrl)
testOutputWriter.EXPECT().OutputWrite(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
testOutputWriter.EXPECT().OutputWrite(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)

testSource := mock.NewMockSource(ctrl)
testSource.EXPECT().List(gomock.Any()).Return([]actions.FileRef{{Path: "act.yaml", Address: "act.addr"}}, nil)
Expand All @@ -41,8 +41,6 @@ hooks:

evtTime := time.Now()
evt := actions.Event{
Source: testSource,
Output: testOutputWriter,
EventType: actions.EventTypePreCommit,
EventTime: evtTime,
RepositoryID: "repoID",
Expand All @@ -52,8 +50,12 @@ hooks:
Committer: "committer",
Metadata: map[string]string{"key": "value"},
}
deps := actions.Deps{
Source: testSource,
Output: testOutputWriter,
}
actionsManager := actions.New(nil)
err := actionsManager.Run(ctx, evt)
err := actionsManager.Run(ctx, evt, deps)
if err != nil {
t.Fatalf("Run() failed with err=%s", err)
}
Expand Down
4 changes: 2 additions & 2 deletions actions/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func NewWebhook(h ActionHook, action *Action) (Hook, error) {
}, nil
}

func (w *Webhook) Run(ctx context.Context, event Event) error {
func (w *Webhook) Run(ctx context.Context, event Event, writer *HookOutputWriter) error {
// post event information as json to webhook endpoint
eventData, err := w.marshalEventInformation(event)
if err != nil {
Expand All @@ -85,7 +85,7 @@ func (w *Webhook) Run(ctx context.Context, event Event) error {

// log response body if needed
if resp.Body != nil && resp.ContentLength > 0 {
if err := event.Output.OutputWrite(ctx, w.ID, resp.Body); err != nil {
if err := writer.OutputWrite(ctx, resp.Body, resp.ContentLength); err != nil {
return err
}
}
Expand Down
13 changes: 13 additions & 0 deletions catalog/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package catalog
import (
"context"
"fmt"
"io"
"io/ioutil"

"github.com/treeverse/lakefs/actions"
Expand Down Expand Up @@ -59,3 +60,15 @@ func (as *actionsSource) Load(ctx context.Context, fileRef actions.FileRef) ([]b
}
return bytes, nil
}

type actionsWriter struct {
adapter block.Adapter
storageNamespace graveler.StorageNamespace
}

func (aw *actionsWriter) OutputWrite(ctx context.Context, path string, reader io.Reader, size int64) error {
return aw.adapter.WithContext(ctx).Put(block.ObjectPointer{
StorageNamespace: aw.storageNamespace.String(),
Identifier: path,
}, size, reader, block.PutOpts{})
}
58 changes: 35 additions & 23 deletions catalog/entry_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ type Store interface {
graveler.Loader
}

type ActionsClient interface {
Run(ctx context.Context, event actions.Event) error
type Actions interface {
Run(ctx context.Context, event actions.Event, deps actions.Deps) error
}

type EntryCatalog struct {
Actions ActionsClient
Actions Actions
BlockAdapter block.Adapter
Store Store
}
Expand All @@ -101,7 +101,7 @@ type Config struct {
Config *config.Config
DB db.Database
LockDB db.Database
Actions ActionsClient
Actions Actions
}

func NewEntryCatalog(cfg Config) (*EntryCatalog, error) {
Expand Down Expand Up @@ -553,23 +553,29 @@ func (e *EntryCatalog) LoadTags(ctx context.Context, repositoryID graveler.Repos

func (e *EntryCatalog) PreCommitHook(ctx context.Context, eventID uuid.UUID, repositoryRecord graveler.RepositoryRecord, branch graveler.BranchID, commit graveler.Commit) error {
evt := actions.Event{
EventType: actions.EventTypePreCommit,
EventID: eventID,
EventTime: time.Now(),
EventType: actions.EventTypePreCommit,
EventID: eventID,
EventTime: time.Now(),
RepositoryID: repositoryRecord.RepositoryID.String(),
BranchID: branch.String(),
CommitMessage: commit.Message,
Committer: commit.Committer,
Metadata: commit.Metadata,
}
deps := actions.Deps{
Source: &actionsSource{
catalog: e,
adapter: e.BlockAdapter,
repositoryID: repositoryRecord.RepositoryID,
storageNamespace: repositoryRecord.StorageNamespace,
ref: branch.Ref(),
},
RepositoryID: repositoryRecord.RepositoryID.String(),
BranchID: branch.String(),
CommitMessage: commit.Message,
Committer: commit.Committer,
Metadata: commit.Metadata,
Output: &actionsWriter{
adapter: e.BlockAdapter,
storageNamespace: repositoryRecord.StorageNamespace,
},
}
return e.Actions.Run(ctx, evt)
return e.Actions.Run(ctx, evt, deps)
}

func (e *EntryCatalog) PostCommitHook(ctx context.Context, eventID uuid.UUID, repositoryRecord graveler.RepositoryRecord, branch graveler.BranchID, commitRecord graveler.CommitRecord) error {
Expand All @@ -578,24 +584,30 @@ func (e *EntryCatalog) PostCommitHook(ctx context.Context, eventID uuid.UUID, re

func (e *EntryCatalog) PreMergeHook(ctx context.Context, eventID uuid.UUID, repositoryRecord graveler.RepositoryRecord, destination graveler.BranchID, source graveler.Ref, commit graveler.Commit) error {
evt := actions.Event{
EventType: actions.EventTypePreMerge,
EventID: eventID,
EventTime: time.Now(),
EventType: actions.EventTypePreMerge,
EventID: eventID,
EventTime: time.Now(),
RepositoryID: repositoryRecord.RepositoryID.String(),
BranchID: destination.String(),
SourceRef: source.String(),
CommitMessage: commit.Message,
Committer: commit.Committer,
Metadata: commit.Metadata,
}
deps := actions.Deps{
Source: &actionsSource{
catalog: e,
adapter: e.BlockAdapter,
repositoryID: repositoryRecord.RepositoryID,
storageNamespace: repositoryRecord.StorageNamespace,
ref: source,
},
RepositoryID: repositoryRecord.RepositoryID.String(),
BranchID: destination.String(),
SourceRef: source.String(),
CommitMessage: commit.Message,
Committer: commit.Committer,
Metadata: commit.Metadata,
Output: &actionsWriter{
adapter: e.BlockAdapter,
storageNamespace: repositoryRecord.StorageNamespace,
},
}
return e.Actions.Run(ctx, evt)
return e.Actions.Run(ctx, evt, deps)
}

func (e *EntryCatalog) PostMergeHook(ctx context.Context, eventID uuid.UUID, repositoryRecord graveler.RepositoryRecord, destination graveler.BranchID, source graveler.Ref, commitRecord graveler.CommitRecord) error {
Expand Down

0 comments on commit 7d63bc5

Please sign in to comment.