Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add health check package and update SQS receive func #178

Merged
merged 4 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 26 additions & 20 deletions aws/sqs/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ type SQS struct {
client *sqs.Client
}

// specific error to return when no messages are received from the queue
var ErrNoMessages = errors.New("no messages received from queue")

// New returns an SQS struct which wraps an SQS client using the default AWS credentials chain.
// This consults (in order) environment vars, config files, EC2 and ECS roles.
// It is an error if the AWS_REGION environment variable is not set.
Expand Down Expand Up @@ -125,29 +128,28 @@ func (s *SQS) ReceiveWithContextAttributes(ctx context.Context, queueURL string,
// receiveMessage is the common code used internally to receive an SQS message based
// on the provided input.
func (s *SQS) receiveMessage(ctx context.Context, input *sqs.ReceiveMessageInput) (Raw, error) {
r, err := s.client.ReceiveMessage(ctx, input)
if err != nil {
return Raw{}, err
}

for {
r, err := s.client.ReceiveMessage(ctx, input)
if err != nil {
return Raw{}, err
}
switch {
case r == nil || len(r.Messages) == 0:
// no message received
return Raw{}, ErrNoMessages

switch {
case r == nil || len(r.Messages) == 0:
// no message received
continue
case len(r.Messages) == 1:
raw := r.Messages[0]

m := Raw{
Body: aws.ToString(raw.Body),
ReceiptHandle: aws.ToString(raw.ReceiptHandle),
Attributes: raw.Attributes,
}
return m, nil
case len(r.Messages) > 1:
return Raw{}, fmt.Errorf("received more than 1 message: %d", len(r.Messages))
case len(r.Messages) == 1:
raw := r.Messages[0]

m := Raw{
Body: aws.ToString(raw.Body),
ReceiptHandle: aws.ToString(raw.ReceiptHandle),
Attributes: raw.Attributes,
}
return m, nil

default:
return Raw{}, fmt.Errorf("received unexpected messages: %d", len(r.Messages))
}
}

Expand Down Expand Up @@ -333,3 +335,7 @@ func Cancelled(err error) bool {
}
return false
}

func IsNoMessagesError(err error) bool {
return errors.Is(err, ErrNoMessages)
}
68 changes: 68 additions & 0 deletions health/check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package health

import (
"context"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
)

// Check calls the given service endpoint with a given context and timeout.
// An error will be returned if the connection fails, or the response status
// is not 200 (i.e. StatusOK). A successful check will return only the check message reply.
func Check(ctx context.Context, servicePath string, timeout time.Duration) ([]byte, error) {
checkUrl := servicePath
if !strings.HasPrefix(checkUrl, "http") {
checkUrl = "http://" + servicePath
}
req, err := url.Parse(checkUrl)
if err != nil {
return nil, err
}

client := &http.Client{
Timeout: timeout,
}

request, err := http.NewRequestWithContext(ctx, http.MethodGet, req.String(), nil)
if err != nil {
return nil, err
}

resp, err := client.Do(request)
if resp == nil || err != nil {
return nil, err
}
defer func() {
_, _ = io.Copy(io.Discard, resp.Body)
resp.Body.Close()
}()

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("%s (%s)", string(body), http.StatusText(resp.StatusCode))
}

return body, nil
}

// CheckStatus runs a Check on the given service and returns zero for a healthy service, and one otherwise.
//
// @param {string} servicePat: service address and path to check e.g. 8080/soh
func CheckStatus(servicePath string, timeout time.Duration) int {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

if _, err := Check(ctx, servicePath, timeout); err != nil {
return 1
}

return 0
}
CallumNZ marked this conversation as resolved.
Show resolved Hide resolved
79 changes: 79 additions & 0 deletions health/health_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package health_test

import (
"context"
"log"
"testing"
"time"

"github.com/GeoNet/kit/health"
)

var (
healthCheckAged = 5 * time.Second //need to have a good heartbeat within this time
healthCheckStartup = 5 * time.Second //ignore heartbeat messages for this time after starting
healthCheckTimeout = 30 * time.Second //health check timeout
healthCheckService = ":7777" //end point to listen to for SOH checks
healthCheckPath = "/soh"
)

func TestExistingSoh(t *testing.T) {
checkPath := "https://api.geonet.org.nz/soh"
if err := healthCheck(checkPath); err != nil {
t.Error("should pass health check at start ")
}
}

func TestHealth(t *testing.T) {
checkPath := healthCheckService + healthCheckPath
//1. should fail at start
if err := healthCheck(checkPath); err == nil {
t.Error("should fail health check at start ")
}
//2. start the process
health := health.New(healthCheckService, healthCheckAged, healthCheckStartup)
health.Ok()
time.Sleep(1 * time.Millisecond) //let the service to start
if err := healthCheck(checkPath); err != nil {
t.Error("should pass health check after started ")
}
//3. test after healthCheckAged
time.Sleep(healthCheckAged) //wait for the healthCheckAged
if err := healthCheck(checkPath); err == nil {
t.Errorf("should fail health check after %v", healthCheckAged)
}

//4. test after heartbeat
health.Ok()
if err := healthCheck(checkPath); err != nil {
t.Error("should pass health check after heartbeat ")
}
}

func TestHealthWithoutAgeCheck(t *testing.T) {
healthCheckAged = 0 * time.Second
healthCheckService = ":7778"
checkPath := healthCheckService + healthCheckPath
//1. start the process
health := health.New(healthCheckService, healthCheckAged, healthCheckStartup)
health.Ok()
time.Sleep(1 * time.Millisecond) //let the service to start
if err := healthCheck(checkPath); err != nil {
t.Error("should pass health check after started ")
}

//2. test after healthCheckAged
time.Sleep(5 * time.Second) //wait for 5 seconds
if err := healthCheck(checkPath); err != nil {
t.Error("should pass health check after 5 seconds", err)
}
}

// check health by calling the http soh endpoint
func healthCheck(sohPath string) error {
ctx, cancel := context.WithTimeout(context.Background(), healthCheckTimeout)
defer cancel()
msg, err := health.Check(ctx, sohPath, healthCheckTimeout)
log.Printf("status: %s", string(msg))
return err
}
159 changes: 159 additions & 0 deletions health/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package health

import (
"context"
"log"
"net/http"
"sync"
"time"
)

// CheckPath is the baked in SOH endpoint path.
const CheckPath = "/soh"
CallumNZ marked this conversation as resolved.
Show resolved Hide resolved

// Service provides a mechanism to update a service SOH status.
type Service struct {
mu sync.Mutex

// status is used to indicate whether the service is running
status bool
// last stores the time of the last update.
last time.Time

// start stores when the service was started.
start time.Time
// aged is the time if no updates have happened indicates the service is no longer running.
// Default zero value means no age check required.
aged time.Duration
// startup is the time after the start which the check is assumed to be successful.
startup time.Duration
}

// New returns a health Service which provides running SOH capabilities.
func New(endpoint string, aged, startup time.Duration) *Service {
service := &Service{
aged: aged,
last: time.Now(),
start: time.Now(),
startup: startup,
}

router := http.NewServeMux()
router.HandleFunc(CheckPath, service.handler)

srv := &http.Server{
Addr: endpoint,
Handler: router,
ReadHeaderTimeout: 2 * time.Second,
}
CallumNZ marked this conversation as resolved.
Show resolved Hide resolved

go func() {
if err := srv.ListenAndServe(); err != nil {
log.Println("error starting health check service", err)
}
}()

return service
}

// state returns the current application state, this is likely to
// be expanded as new checks are added.
func (s *Service) state() bool {
s.mu.Lock()
defer s.mu.Unlock()

return s.status
}

func (s *Service) handler(w http.ResponseWriter, r *http.Request) {
ok := s.state()
switch {
case time.Since(s.start) < s.startup:
// Avoid terminating before initial check period
w.WriteHeader(http.StatusOK)
CallumNZ marked this conversation as resolved.
Show resolved Hide resolved
if _, err := w.Write([]byte("warn")); err != nil {
log.Println("error writing response", err)
}
case ok && (s.aged == 0 || time.Since(s.last) < s.aged):
// Service is OK and actively updating
w.WriteHeader(http.StatusOK)
if _, err := w.Write([]byte("ok")); err != nil {
log.Println("error writing response", err)
}
default:
// Service is not OK or has stopped updating
w.WriteHeader(http.StatusInternalServerError)
if _, err := w.Write([]byte("fail")); err != nil {
log.Println("error writing response", err)
}
}
}

// Ok updates the Service to indicate the service is running as expected.
func (s *Service) Ok() {
s.Update(true)
}

// Fail updates the Service to indicate the service is not running as expected.
func (s *Service) Fail() {
s.Update(false)
}

// Update sets the Service to the given state, and stores the time since the last update.
func (s *Service) Update(status bool) {
s.mu.Lock()
defer s.mu.Unlock()

s.status = status
s.last = time.Now()
}

// Alive allows an application to perform a complex task while still sending hearbeats.
func (s *Service) Alive(ctx context.Context, heartbeat time.Duration) context.CancelFunc {
ctx, cancel := context.WithCancel(ctx)

go func() {
defer cancel()

ticker := time.NewTicker(heartbeat)
defer ticker.Stop()

s.Ok()

for {
select {
case <-ticker.C:
s.Ok()
case <-ctx.Done():
return
}
}
}()

return cancel
}

// Pause allows an application to stall for a set period of time while still sending hearbeats.
func (s *Service) Pause(ctx context.Context, deadline, heartbeat time.Duration) context.CancelFunc {
ctx, cancel := context.WithTimeout(ctx, deadline)

go func() {
defer cancel()

ticker := time.NewTicker(heartbeat)
defer ticker.Stop()

s.Ok()

for {
select {
case <-ticker.C:
s.Ok()
case <-ctx.Done():
return
}
}
}()

return cancel
}
Comment on lines +111 to +159
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the use case for these? Might be unneeded complexity.
If the application has a long running process, wouldn't you just adjust aged ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently not used, don't know if there is any use case by Mark, i would keep it in case we need it later

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could Pause() while you wait to receive a message, for example

Loading