Skip to content

Commit

Permalink
Feature: add events handler
Browse files Browse the repository at this point in the history
* add events pubsub.
* add events handler.
* move http handlers to handlers package.
* add scripts directory.
* update Taskfile.
  • Loading branch information
rugwirobaker committed Oct 7, 2020
1 parent 830ec3a commit 2f6263f
Show file tree
Hide file tree
Showing 20 changed files with 586 additions and 96 deletions.
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@
Helmes is a demo 12 factor app in Go that can be deployed on any cloud native compliant platform as a container.
Helmes as the name obviously implies is a messenger that Sends SMS messages to any supported carrier in Rwanda.

## Goals

- [x] Simple sms delivery.
- [x] Delivery notifications.
- [ ] Implement a simple store(badger) to record access records.
- [ ] Document authentication via [pomerium.io](https://www.pomerium.io/).
- [ ] Add application metrics.
- [ ] Build and deploy container imag.
- [ ] Add valid knative deployment manifests

## Environment variables
To function Helmes requires a couple of environment variables:

Expand Down Expand Up @@ -69,6 +79,13 @@ Finally send the payload as defined in `helmes.json`
curl -d "@helmes.json" -H "Content-Type: application/json" -X POST localhost:$PORT/api/send
```

There is an notifications endpoint `api/events/$ID/status` you could subscribe to to recieve
sms delivery notications. Therss is a helping script you could use to run an example:

```
./scripts/send.sh
```

6. You can build a docker image
```
task image
Expand Down
3 changes: 0 additions & 3 deletions Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ tasks:

build:
desc: Build the application
deps: [generate]
cmds:
- echo "compiling binary..."
- CGO_ENABLED=0 go build -ldflags="{{.LDFLAGS}}" -o bin/helmes ./cmd/helmes
Expand Down Expand Up @@ -46,7 +45,5 @@ tasks:

test:
desc: Run unit tests
deps: [generate]
cmds:
- go generate ./...
- go test -race -covermode=atomic ./...
9 changes: 6 additions & 3 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ import (

// Server ...
type Server struct {
Events helmes.Pubsub
Service helmes.SendService
}

// New api Server instance
func New(svc helmes.SendService) *Server {
return &Server{Service: svc}
func New(svc helmes.SendService, events helmes.Pubsub) *Server {
return &Server{Service: svc, Events: events}
}

// Handler returns an http.Handler
Expand All @@ -32,9 +33,11 @@ func (s Server) Handler() http.Handler {
w.Write([]byte("Welcome to helmes"))
})

r.Get("/healthz", handlers.HealthHandler())
r.Get("/version", handlers.VersionHandler())
r.Get("/healthz", handlers.HealthHandler())
r.Post("/send", handlers.SendHandler(s.Service))
r.Get("/events/{id}/status", handlers.SubscribeHandler(s.Events))
r.Post("/delivery", handlers.DeliveryHandler(s.Events))

return r
}
26 changes: 26 additions & 0 deletions api/handlers/delivery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package handlers

import (
"encoding/json"
"log"
"net/http"

"github.com/rugwirobaker/helmes"
)

// DeliveryHandler handles delivery callback reception
func DeliveryHandler(events helmes.Pubsub) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {

in := new(event)

if err := json.NewDecoder(r.Body).Decode(in); err != nil {
log.Printf("failed to serialize request")
JSON(w, NewError(err.Error()), 500)
return
}
events.Publish(r.Context(), convertEvent(in))

JSON(w, map[string]string{"status": "ok"}, http.StatusOK)
}
}
15 changes: 15 additions & 0 deletions api/handlers/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package handlers

//Error ...
type Error struct {
Message string `json:"message"`
}

func (e Error) Error() string {
return e.Message
}

// NewError creates a new error instance
func NewError(text string) error {
return &Error{Message: text}
}
74 changes: 74 additions & 0 deletions api/handlers/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package handlers

import (
"context"
"log"
"net/http"
"time"

"github.com/go-chi/chi"
"github.com/rugwirobaker/helmes"
)

// SubscribeHandler handles user subscriptions to delivery notifications
func SubscribeHandler(events helmes.Pubsub) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")

h := w.Header()
h.Set("Content-Type", "text/event-stream")
h.Set("Cache-Control", "no-cache")
h.Set("Connection", "keep-alive")
h.Set("X-Accel-Buffering", "no")

f, ok := w.(http.Flusher)
if !ok {
log.Println("could not start stream")
return
}

ctx, cancel := context.WithCancel(r.Context())
defer cancel()

event, err := events.Subscribe(ctx, id)
if err != nil {
log.Println(err)
Flush(w, f, NewError(err.Error()))
return
}

for {
select {
case <-ctx.Done():
log.Println("event: stream canceled")
Flush(w, f, NewError("context canceled"))
return

case <-time.After(time.Second * 10):
log.Println("event: stream timeout")
Flush(w, f, NewError("connection timeout"))
return

case res := <-event:
Flush(w, f, res)
events.Done(ctx, res.ID)
return
}
}
}
}

type event struct {
MsgRef string `json:"msgRef"`
Recipient string `json:"recipient"`
GatewayRef string `json:"gatewayRef"`
Status int `json:"status"`
}

func convertEvent(event *event) helmes.Event {
return helmes.Event{
ID: event.MsgRef,
Recipient: event.Recipient,
Status: helmes.St(event.Status),
}
}
64 changes: 0 additions & 64 deletions api/handlers/handlers.go

This file was deleted.

82 changes: 82 additions & 0 deletions api/handlers/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package handlers_test

import (
"bytes"
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"sync"
"testing"

"github.com/go-chi/chi"
"github.com/golang/mock/gomock"
"github.com/google/go-cmp/cmp"
"github.com/rugwirobaker/helmes"
Expand All @@ -23,6 +26,17 @@ var (
ID: "message id",
Cost: 1,
}
dummyEvent = &helmes.Event{
ID: "fake_id",
Status: helmes.St(1),
Recipient: "078xxxxxxx",
}
dummyCallback = &callback{
MsgRef: "fake_id",
Recipient: "078xxxxxxx",
GatewayRef: "xxxxx",
Status: 1,
}
)

func TestSendHander(t *testing.T) {
Expand Down Expand Up @@ -79,3 +93,71 @@ func TestVersionHandler(t *testing.T) {
t.Errorf(diff)
}
}

func TestSubscribeHandler(t *testing.T) {
controller := gomock.NewController(t)
defer controller.Finish()

mockEvent := make(chan helmes.Event)

wg := sync.WaitGroup{}

wg.Add(1)
go func() {
mockEvent <- *dummyEvent
wg.Done()
}()

ps := mock.NewMockPubsub(controller)
ps.EXPECT().Subscribe(gomock.Any(), "fake_id").Return(mockEvent, nil)
ps.EXPECT().Done(gomock.Any(), "fake_id").Return(nil)

c := new(chi.Context)
c.URLParams.Add("id", "fake_id")

w := httptest.NewRecorder()
r := httptest.NewRequest("GET", "/", nil)
r = r.WithContext(
context.WithValue(context.Background(), chi.RouteCtxKey, c),
)

handlers.SubscribeHandler(ps).ServeHTTP(w, r)

if got, want := w.Code, 200; want != got {
t.Errorf("Want response code %d, got %d", want, got)
}

got, want := &helmes.Event{}, dummyEvent
json.NewDecoder(w.Body).Decode(got)
if diff := cmp.Diff(got, want); len(diff) != 0 {
t.Errorf(diff)
}
}

func TestDeliveryHandler(t *testing.T) {
controller := gomock.NewController(t)
defer controller.Finish()

ps := mock.NewMockPubsub(controller)
ps.EXPECT().Publish(gomock.Any(), gomock.Any())

in := new(bytes.Buffer)

_ = json.NewEncoder(in).Encode(dummyCallback)

w := httptest.NewRecorder()
r := httptest.NewRequest("POST", "/", in)

handlers.DeliveryHandler(ps).ServeHTTP(w, r)

if got, want := w.Code, 200; want != got {
t.Errorf("Want response code %d, got %d", want, got)
}
}

type callback struct {
MsgRef string `json:"msgRef"`
Recipient string `json:"recipient"`
GatewayRef string `json:"gatewayRef"`
Status int `json:"status"`
}
21 changes: 21 additions & 0 deletions api/handlers/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package handlers

import (
"net/http"
"runtime"
"time"

helmes "github.com/rugwirobaker/helmes"
)

// HealthHandler reports the health of the application
func HealthHandler() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
res := &helmes.Health{
GitRev: helmes.Data().Version,
Uptime: time.Since(startTime).Seconds(),
Goroutines: runtime.NumGoroutine(),
}
JSON(w, res, http.StatusOK)
}
}
Loading

0 comments on commit 2f6263f

Please sign in to comment.