Skip to content

Commit

Permalink
feat: webhook channel query param (#40)
Browse files Browse the repository at this point in the history
* Events can now be received at both `/events` or `/events/<channel-name>` endpoints

* Resolve conversation

* move gorilla into webhook package

Signed-off-by: Nico Braun <rainbowstack@gmail.com>

* fix golangci lint issues

Signed-off-by: Nico Braun <rainbowstack@gmail.com>

---------

Signed-off-by: Nico Braun <rainbowstack@gmail.com>
Co-authored-by: Nico Braun <rainbowstack@gmail.com>
  • Loading branch information
Archisman-Mridha and bluebrown authored Mar 30, 2024
1 parent 76e036a commit 806fde4
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 10 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ webhook endpoint, whenever a new image is pushed.
## Webhook

In servermode Kobold exposes a webhook endpoint at
`$KOBOLD_ADDR_WEBHOOK/events?chan=<channel>`. The channel is used to identify
`$KOBOLD_ADDR_WEBHOOK/events/<channel>`. The channel is used to identify
the pipelines to run. It accepts any content in the body since it is the
[channels decoder's](#decoders) responsibility to parse the body.

Expand Down
2 changes: 1 addition & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func run(ctx context.Context, args []string, env []string) error {

g.Go(func() error {
whmux := http.NewServeMux()
whmux.Handle(prefix+"/events", http.StripPrefix(prefix, webhook.New(sched)))
whmux.Handle(prefix+"/", http.StripPrefix(prefix, webhook.New(sched)))
return listenAndServeContext(ctx, "webhook", webhookAddr, whmux)
})

Expand Down
45 changes: 37 additions & 8 deletions http/webhook/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,59 @@ package webhook

import (
"bytes"
"context"
"log/slog"
"net/http"

"github.com/bluebrown/kobold/task"
"github.com/gorilla/mux"
)

const warnQueryDeprecated = `sending channel name using query parameters is deprecated and will be removed in future releases. Please use path parameters instead.`

type scheduler interface {
Schedule(ctx context.Context, chn string, data []byte) error
}

type Webhook struct {
s *task.Scheduler
s scheduler
r *mux.Router
}

func New(s *task.Scheduler) *Webhook {
return &Webhook{
s: s,
}
func New(s scheduler) *Webhook {
wh := &Webhook{s: s, r: mux.NewRouter()}
wh.r.HandleFunc("/events/{chan}", wh.handleEvent)
wh.r.HandleFunc("/events", wh.handleEvent).Queries("chan", "{chan}")
return wh
}

func (api *Webhook) ServeHTTP(w http.ResponseWriter, r *http.Request) {
api.r.ServeHTTP(w, r)
}

func (api *Webhook) handleEvent(w http.ResponseWriter, r *http.Request) {
var buf bytes.Buffer
if _, err := buf.ReadFrom(r.Body); err != nil {
http.Error(w, "unable to read body", http.StatusBadRequest)
return
}
if err := api.s.Schedule(r.Context(), r.URL.Query().Get("chan"), buf.Bytes()); err != nil {

chn := mux.Vars(r)["chan"]
logger := slog.With("chan", chn)

if err := api.s.Schedule(r.Context(), chn, buf.Bytes()); err != nil {
http.Error(w, "internal error", http.StatusInternalServerError)
logger.Error("schedule task", "error", err)
return
}

if !r.URL.Query().Has("chan") {
w.WriteHeader(http.StatusAccepted)
return
}
w.WriteHeader(http.StatusAccepted)

logger.Warn("webhook event", "msg", warnQueryDeprecated)

if _, err := w.Write([]byte(warnQueryDeprecated)); err != nil {
logger.Error("write response body", "error", err)
}
}
74 changes: 74 additions & 0 deletions http/webhook/handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package webhook

import (
"context"
"net/http/httptest"
"reflect"
"strings"
"testing"
)

func TestWebhook_ServeHTTP(t *testing.T) {
t.Parallel()
tests := []struct {
name string
giveURL string
giveBody string
wantStatus int
wantResBody string
wantChan string
}{
{
name: "query parameter",
giveURL: "/events?chan=query",
giveBody: "hello",
wantStatus: 200,
wantResBody: warnQueryDeprecated,
wantChan: "query",
},
{
name: "path parameter",
giveURL: "/events/path",
giveBody: "hello",
wantStatus: 202,
wantChan: "path",
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
var (
ms = mockScheduler{}
api = New(&ms)
rec = httptest.NewRecorder()
req = httptest.NewRequest("POST", tt.giveURL, strings.NewReader(tt.giveBody))
)
api.ServeHTTP(rec, req)
res := rec.Result()
defer res.Body.Close()
assertEq(t, res.StatusCode, tt.wantStatus)
assertEq(t, rec.Body.String(), tt.wantResBody)
assertEq(t, ms.ch, tt.wantChan)
assertEq(t, string(ms.buf), tt.giveBody)
})
}
}

type mockScheduler struct {
ch string
buf []byte
}

func (m *mockScheduler) Schedule(_ context.Context, chn string, data []byte) error {
m.ch = chn
m.buf = data
return nil
}

func assertEq[T any](t *testing.T, got, want T) {
t.Helper()
if !reflect.DeepEqual(got, want) {
t.Errorf("got %v, want %v", got, want)
}
}

0 comments on commit 806fde4

Please sign in to comment.